diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index bc9025cb360..674388f3fa1 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -3,7 +3,6 @@ true - @@ -40,6 +39,8 @@ + + diff --git a/src/HotChocolate/Core/src/Subscriptions.Nats/HotChocolate.Subscriptions.Nats.csproj b/src/HotChocolate/Core/src/Subscriptions.Nats/HotChocolate.Subscriptions.Nats.csproj index 2afeb472287..02f2478826b 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Nats/HotChocolate.Subscriptions.Nats.csproj +++ b/src/HotChocolate/Core/src/Subscriptions.Nats/HotChocolate.Subscriptions.Nats.csproj @@ -17,7 +17,7 @@ - + diff --git a/src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs b/src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs index 9d27b123bfe..2abcb1896fe 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs @@ -1,5 +1,5 @@ -using AlterNats; using HotChocolate.Subscriptions.Diagnostics; +using NATS.Client.Core; namespace HotChocolate.Subscriptions.Nats; @@ -34,7 +34,11 @@ protected override async ValueTask OnSendAsync( CancellationToken cancellationToken = default) { var serialized = _serializer.Serialize(message); - await _connection.PublishAsync(formattedTopic, serialized).ConfigureAwait(false); + + await _connection.PublishAsync( + formattedTopic, + serialized, + cancellationToken: cancellationToken).ConfigureAwait(false); } protected override async ValueTask OnCompleteAsync(string formattedTopic) diff --git a/src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs b/src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs index 4b4b5f51f9b..566c9bf017f 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs @@ -1,6 +1,6 @@ using System.Diagnostics; -using AlterNats; using HotChocolate.Subscriptions.Diagnostics; +using NATS.Client.Core; using static HotChocolate.Subscriptions.Nats.NatsResources; namespace HotChocolate.Subscriptions.Nats; @@ -30,13 +30,22 @@ protected override async ValueTask OnConnectAsync( Debug.Assert(_connection != null, "_connection != null"); Debug.Assert(_connection != null, "_serializer != null"); - var natsSession = await _connection - .SubscribeAsync(Name, (string? m) => DispatchMessage(_serializer, m)) - .ConfigureAwait(false); + var natsSession = Task.Run(async () => + { + await foreach (var msg in _connection.SubscribeAsync( + Name, + cancellationToken: cancellationToken)) + { + Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n");//tmp + DispatchMessage(_serializer, msg.Data); + } + }, cancellationToken); DiagnosticEvents.ProviderTopicInfo(Name, NatsTopic_ConnectAsync_SubscribedToNats); - return new Session(Name, natsSession, DiagnosticEvents); + return natsSession; + + //return new Session(Name, natsSession, DiagnosticEvents); } private sealed class Session : IDisposable diff --git a/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/HotChocolate.Subscriptions.Nats.Tests.csproj b/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/HotChocolate.Subscriptions.Nats.Tests.csproj index b62976e04f3..f876bd55a53 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/HotChocolate.Subscriptions.Nats.Tests.csproj +++ b/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/HotChocolate.Subscriptions.Nats.Tests.csproj @@ -22,6 +22,7 @@ + diff --git a/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/NatsIntegrationTests.cs b/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/NatsIntegrationTests.cs index f795cb6aaba..e59c1ccafd1 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/NatsIntegrationTests.cs +++ b/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/NatsIntegrationTests.cs @@ -1,6 +1,6 @@ -using AlterNats; using Microsoft.Extensions.DependencyInjection; using HotChocolate.Execution.Configuration; +using NATS.Client.Hosting; using Squadron; using Xunit.Abstractions; diff --git a/src/HotChocolate/Core/test/Subscriptions.Tests/SubscriptionIntegrationTestBase.cs b/src/HotChocolate/Core/test/Subscriptions.Tests/SubscriptionIntegrationTestBase.cs index 80862e56f92..fc245ae577d 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Tests/SubscriptionIntegrationTestBase.cs +++ b/src/HotChocolate/Core/test/Subscriptions.Tests/SubscriptionIntegrationTestBase.cs @@ -33,7 +33,7 @@ public virtual async Task Subscribe_Infer_Topic() // we need to execute the read for the subscription to start receiving. await using var responseStream = result.ExpectResponseStream(); - var results = responseStream.ReadResultsAsync().ConfigureAwait(false); + var results = responseStream.ReadResultsAsync(); // assert await sender.SendAsync("OnMessage", "bar", cts.Token); @@ -41,7 +41,7 @@ public virtual async Task Subscribe_Infer_Topic() var snapshot = new Snapshot(); - await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var response in results.WithCancellation(cts.Token)) { snapshot.Add(response); } @@ -69,7 +69,7 @@ public virtual async Task Subscribe_Static_Topic() // we need to execute the read for the subscription to start receiving. await using var responseStream = result.ExpectResponseStream(); - var results = responseStream.ReadResultsAsync().ConfigureAwait(false); + var results = responseStream.ReadResultsAsync(); // assert await sender.SendAsync("OnMessage", new Foo { Bar = "Hello" }, cts.Token); @@ -77,7 +77,7 @@ public virtual async Task Subscribe_Static_Topic() var snapshot = new Snapshot(); - await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var response in results.WithCancellation(cts.Token)) { snapshot.Add(response); } @@ -107,7 +107,7 @@ public virtual async Task Subscribe_Topic_With_Arguments() // we need to execute the read for the subscription to start receiving. await using var responseStream = result.ExpectResponseStream(); - var results = responseStream.ReadResultsAsync().ConfigureAwait(false); + var results = responseStream.ReadResultsAsync(); // assert await sender.SendAsync("OnMessage_a", "abc", cts.Token); @@ -115,7 +115,7 @@ public virtual async Task Subscribe_Topic_With_Arguments() var snapshot = new Snapshot(); - await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var response in results.WithCancellation(cts.Token)) { snapshot.Add(response, name: "From Stream A"); } @@ -147,10 +147,10 @@ public virtual async Task Subscribe_Topic_With_Arguments_2_Subscriber() // we need to execute the read for the subscription to start receiving. await using var responseStream1 = result1.ExpectResponseStream(); - var results1 = responseStream1.ReadResultsAsync().ConfigureAwait(false); + var results1 = responseStream1.ReadResultsAsync(); await using var responseStream2 = result2.ExpectResponseStream(); - var results2 = responseStream2.ReadResultsAsync().ConfigureAwait(false); + var results2 = responseStream2.ReadResultsAsync(); // assert await sender.SendAsync("OnMessage_a", "abc", cts.Token); @@ -158,12 +158,12 @@ public virtual async Task Subscribe_Topic_With_Arguments_2_Subscriber() var snapshot = new Snapshot(); - await foreach (var response in results1.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var response in results1.WithCancellation(cts.Token)) { snapshot.Add(response, name: "From Stream 1"); } - await foreach (var response in results2.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var response in results2.WithCancellation(cts.Token)) { snapshot.Add(response, name: "From Stream 2"); } @@ -208,10 +208,10 @@ public virtual async Task Subscribe_Topic_With_Arguments_2_Topics() // we need to execute the read for the subscription to start receiving. await using var responseStream1 = result1.ExpectResponseStream(); - var results1 = responseStream1.ReadResultsAsync().ConfigureAwait(false); + var results1 = responseStream1.ReadResultsAsync(); await using var responseStream2 = result2.ExpectResponseStream(); - var results2 = responseStream2.ReadResultsAsync().ConfigureAwait(false); + var results2 = responseStream2.ReadResultsAsync(); // assert await sender.SendAsync("OnMessage_a", "abc", cts.Token); @@ -222,12 +222,12 @@ public virtual async Task Subscribe_Topic_With_Arguments_2_Topics() var snapshot = new Snapshot(); - await foreach (var response in results1.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var response in results1.WithCancellation(cts.Token)) { snapshot.Add(response, name: "From Stream 1"); } - await foreach (var response in results2.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var response in results2.WithCancellation(cts.Token)) { snapshot.Add(response, name: "From Stream 2"); } @@ -268,7 +268,7 @@ public virtual async Task Subscribe_Topic_With_2_Arguments() // we need to execute the read for the subscription to start receiving. await using var responseStream = result.ExpectResponseStream(); - var results = responseStream.ReadResultsAsync().ConfigureAwait(false); + var results = responseStream.ReadResultsAsync(); // assert await sender.SendAsync("OnMessage2_a_b", "abc", cts.Token); @@ -276,7 +276,7 @@ public virtual async Task Subscribe_Topic_With_2_Arguments() var snapshot = new Snapshot(); - await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var response in results.WithCancellation(cts.Token)) { snapshot.Add(response, name: "From Stream A"); } @@ -304,13 +304,13 @@ public virtual async Task Subscribe_And_Complete_Topic() // we need to execute the read for the subscription to start receiving. await using var responseStream = result.ExpectResponseStream(); - var results = responseStream.ReadResultsAsync().ConfigureAwait(false); + var results = responseStream.ReadResultsAsync(); // assert await Task.Delay(2000, cts.Token); await sender.CompleteAsync("OnMessage"); - await foreach (var _ in results.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var _ in results.WithCancellation(cts.Token)) { Assert.Fail("Should not have any messages."); } @@ -331,13 +331,13 @@ public virtual async Task Subscribe_And_Complete_Topic_With_ValueTypeMessage() // we need to execute the read for the subscription to start receiving. await using var responseStream = result.ExpectResponseStream(); - var results = responseStream.ReadResultsAsync().ConfigureAwait(false); + var results = responseStream.ReadResultsAsync(); // assert await Task.Delay(2000, cts.Token); await sender.CompleteAsync("OnMessage3"); - await foreach (var _ in results.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var _ in results.WithCancellation(cts.Token)) { Assert.Fail("Should not have any messages."); }