From 6e2fd81f90845e07459fca4c4a546bf3380ca1cb Mon Sep 17 00:00:00 2001 From: Glen Date: Thu, 29 Aug 2024 17:09:15 +0200 Subject: [PATCH 1/2] WIP --- src/Directory.Packages.props | 3 +- .../HotChocolate.Subscriptions.Nats.csproj | 2 +- .../Core/src/Subscriptions.Nats/NatsPubSub.cs | 8 +++- .../Core/src/Subscriptions.Nats/NatsTopic.cs | 19 ++++++--- ...tChocolate.Subscriptions.Nats.Tests.csproj | 1 + .../NatsIntegrationTests.cs | 2 +- .../SubscriptionIntegrationTestBase.cs | 40 +++++++++---------- 7 files changed, 45 insertions(+), 30 deletions(-) diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 8ab428bdcfd..0aea24095e2 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -4,7 +4,6 @@ - @@ -37,6 +36,8 @@ + + diff --git a/src/HotChocolate/Core/src/Subscriptions.Nats/HotChocolate.Subscriptions.Nats.csproj b/src/HotChocolate/Core/src/Subscriptions.Nats/HotChocolate.Subscriptions.Nats.csproj index 6f15f4f4fc8..eb4b3fd5e56 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Nats/HotChocolate.Subscriptions.Nats.csproj +++ b/src/HotChocolate/Core/src/Subscriptions.Nats/HotChocolate.Subscriptions.Nats.csproj @@ -21,7 +21,7 @@ - + diff --git a/src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs b/src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs index 9d27b123bfe..2abcb1896fe 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs @@ -1,5 +1,5 @@ -using AlterNats; using HotChocolate.Subscriptions.Diagnostics; +using NATS.Client.Core; namespace HotChocolate.Subscriptions.Nats; @@ -34,7 +34,11 @@ protected override async ValueTask OnSendAsync( CancellationToken cancellationToken = default) { var serialized = _serializer.Serialize(message); - await _connection.PublishAsync(formattedTopic, serialized).ConfigureAwait(false); + + await _connection.PublishAsync( + formattedTopic, + serialized, + cancellationToken: cancellationToken).ConfigureAwait(false); } protected override async ValueTask OnCompleteAsync(string formattedTopic) diff --git a/src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs b/src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs index 4b4b5f51f9b..566c9bf017f 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs @@ -1,6 +1,6 @@ using System.Diagnostics; -using AlterNats; using HotChocolate.Subscriptions.Diagnostics; +using NATS.Client.Core; using static HotChocolate.Subscriptions.Nats.NatsResources; namespace HotChocolate.Subscriptions.Nats; @@ -30,13 +30,22 @@ protected override async ValueTask OnConnectAsync( Debug.Assert(_connection != null, "_connection != null"); Debug.Assert(_connection != null, "_serializer != null"); - var natsSession = await _connection - .SubscribeAsync(Name, (string? m) => DispatchMessage(_serializer, m)) - .ConfigureAwait(false); + var natsSession = Task.Run(async () => + { + await foreach (var msg in _connection.SubscribeAsync( + Name, + cancellationToken: cancellationToken)) + { + Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n");//tmp + DispatchMessage(_serializer, msg.Data); + } + }, cancellationToken); DiagnosticEvents.ProviderTopicInfo(Name, NatsTopic_ConnectAsync_SubscribedToNats); - return new Session(Name, natsSession, DiagnosticEvents); + return natsSession; + + //return new Session(Name, natsSession, DiagnosticEvents); } private sealed class Session : IDisposable diff --git a/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/HotChocolate.Subscriptions.Nats.Tests.csproj b/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/HotChocolate.Subscriptions.Nats.Tests.csproj index 095d28be3db..81996f37c1c 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/HotChocolate.Subscriptions.Nats.Tests.csproj +++ b/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/HotChocolate.Subscriptions.Nats.Tests.csproj @@ -21,6 +21,7 @@ + diff --git a/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/NatsIntegrationTests.cs b/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/NatsIntegrationTests.cs index f6cdd1f44d2..8ba6ad9a468 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/NatsIntegrationTests.cs +++ b/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/NatsIntegrationTests.cs @@ -1,6 +1,6 @@ -using AlterNats; using Microsoft.Extensions.DependencyInjection; using HotChocolate.Execution.Configuration; +using NATS.Client.Hosting; using Squadron; using Xunit.Abstractions; diff --git a/src/HotChocolate/Core/test/Subscriptions.Tests/SubscriptionIntegrationTestBase.cs b/src/HotChocolate/Core/test/Subscriptions.Tests/SubscriptionIntegrationTestBase.cs index 21d81508e6e..25b912570bb 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Tests/SubscriptionIntegrationTestBase.cs +++ b/src/HotChocolate/Core/test/Subscriptions.Tests/SubscriptionIntegrationTestBase.cs @@ -34,7 +34,7 @@ public virtual async Task Subscribe_Infer_Topic() // we need to execute the read for the subscription to start receiving. await using var responseStream = result.ExpectResponseStream(); - var results = responseStream.ReadResultsAsync().ConfigureAwait(false); + var results = responseStream.ReadResultsAsync(); // assert await sender.SendAsync("OnMessage", "bar", cts.Token); @@ -42,7 +42,7 @@ public virtual async Task Subscribe_Infer_Topic() var snapshot = new Snapshot(); - await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var response in results.WithCancellation(cts.Token)) { snapshot.Add(response); } @@ -70,7 +70,7 @@ public virtual async Task Subscribe_Static_Topic() // we need to execute the read for the subscription to start receiving. await using var responseStream = result.ExpectResponseStream(); - var results = responseStream.ReadResultsAsync().ConfigureAwait(false); + var results = responseStream.ReadResultsAsync(); // assert await sender.SendAsync("OnMessage", new Foo { Bar = "Hello", }, cts.Token); @@ -78,7 +78,7 @@ public virtual async Task Subscribe_Static_Topic() var snapshot = new Snapshot(); - await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var response in results.WithCancellation(cts.Token)) { snapshot.Add(response); } @@ -108,7 +108,7 @@ public virtual async Task Subscribe_Topic_With_Arguments() // we need to execute the read for the subscription to start receiving. await using var responseStream = result.ExpectResponseStream(); - var results = responseStream.ReadResultsAsync().ConfigureAwait(false); + var results = responseStream.ReadResultsAsync(); // assert await sender.SendAsync("OnMessage_a", "abc", cts.Token); @@ -116,7 +116,7 @@ public virtual async Task Subscribe_Topic_With_Arguments() var snapshot = new Snapshot(); - await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var response in results.WithCancellation(cts.Token)) { snapshot.Add(response, name: "From Stream A"); } @@ -148,10 +148,10 @@ public virtual async Task Subscribe_Topic_With_Arguments_2_Subscriber() // we need to execute the read for the subscription to start receiving. await using var responseStream1 = result1.ExpectResponseStream(); - var results1 = responseStream1.ReadResultsAsync().ConfigureAwait(false); + var results1 = responseStream1.ReadResultsAsync(); await using var responseStream2 = result2.ExpectResponseStream(); - var results2 = responseStream2.ReadResultsAsync().ConfigureAwait(false); + var results2 = responseStream2.ReadResultsAsync(); // assert await sender.SendAsync("OnMessage_a", "abc", cts.Token); @@ -159,12 +159,12 @@ public virtual async Task Subscribe_Topic_With_Arguments_2_Subscriber() var snapshot = new Snapshot(); - await foreach (var response in results1.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var response in results1.WithCancellation(cts.Token)) { snapshot.Add(response, name: "From Stream 1"); } - await foreach (var response in results2.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var response in results2.WithCancellation(cts.Token)) { snapshot.Add(response, name: "From Stream 2"); } @@ -209,10 +209,10 @@ public virtual async Task Subscribe_Topic_With_Arguments_2_Topics() // we need to execute the read for the subscription to start receiving. await using var responseStream1 = result1.ExpectResponseStream(); - var results1 = responseStream1.ReadResultsAsync().ConfigureAwait(false); + var results1 = responseStream1.ReadResultsAsync(); await using var responseStream2 = result2.ExpectResponseStream(); - var results2 = responseStream2.ReadResultsAsync().ConfigureAwait(false); + var results2 = responseStream2.ReadResultsAsync(); // assert await sender.SendAsync("OnMessage_a", "abc", cts.Token); @@ -223,12 +223,12 @@ public virtual async Task Subscribe_Topic_With_Arguments_2_Topics() var snapshot = new Snapshot(); - await foreach (var response in results1.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var response in results1.WithCancellation(cts.Token)) { snapshot.Add(response, name: "From Stream 1"); } - await foreach (var response in results2.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var response in results2.WithCancellation(cts.Token)) { snapshot.Add(response, name: "From Stream 2"); } @@ -269,7 +269,7 @@ public virtual async Task Subscribe_Topic_With_2_Arguments() // we need to execute the read for the subscription to start receiving. await using var responseStream = result.ExpectResponseStream(); - var results = responseStream.ReadResultsAsync().ConfigureAwait(false); + var results = responseStream.ReadResultsAsync(); // assert await sender.SendAsync("OnMessage2_a_b", "abc", cts.Token); @@ -277,7 +277,7 @@ public virtual async Task Subscribe_Topic_With_2_Arguments() var snapshot = new Snapshot(); - await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var response in results.WithCancellation(cts.Token)) { snapshot.Add(response, name: "From Stream A"); } @@ -305,13 +305,13 @@ public virtual async Task Subscribe_And_Complete_Topic() // we need to execute the read for the subscription to start receiving. await using var responseStream = result.ExpectResponseStream(); - var results = responseStream.ReadResultsAsync().ConfigureAwait(false); + var results = responseStream.ReadResultsAsync(); // assert await Task.Delay(2000, cts.Token); await sender.CompleteAsync("OnMessage"); - await foreach (var unused in results.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var unused in results.WithCancellation(cts.Token)) { Assert.Fail("Should not have any messages."); } @@ -332,13 +332,13 @@ public virtual async Task Subscribe_And_Complete_Topic_With_ValueTypeMessage() // we need to execute the read for the subscription to start receiving. await using var responseStream = result.ExpectResponseStream(); - var results = responseStream.ReadResultsAsync().ConfigureAwait(false); + var results = responseStream.ReadResultsAsync(); // assert await Task.Delay(2000, cts.Token); await sender.CompleteAsync("OnMessage3"); - await foreach (var unused in results.WithCancellation(cts.Token).ConfigureAwait(false)) + await foreach (var unused in results.WithCancellation(cts.Token)) { Assert.Fail("Should not have any messages."); } From cfc5d097b1a436fbdb1dd23cdd6e68055818cbfa Mon Sep 17 00:00:00 2001 From: Glen Date: Sat, 30 Aug 2025 17:18:52 +0200 Subject: [PATCH 2/2] Removed AlterNats.Hosting package --- src/Directory.Packages.props | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 870795acc3b..674388f3fa1 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -3,7 +3,6 @@ true -