diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props
index bc9025cb360..674388f3fa1 100644
--- a/src/Directory.Packages.props
+++ b/src/Directory.Packages.props
@@ -3,7 +3,6 @@
true
-
@@ -40,6 +39,8 @@
+
+
diff --git a/src/HotChocolate/Core/src/Subscriptions.Nats/HotChocolate.Subscriptions.Nats.csproj b/src/HotChocolate/Core/src/Subscriptions.Nats/HotChocolate.Subscriptions.Nats.csproj
index 2afeb472287..02f2478826b 100644
--- a/src/HotChocolate/Core/src/Subscriptions.Nats/HotChocolate.Subscriptions.Nats.csproj
+++ b/src/HotChocolate/Core/src/Subscriptions.Nats/HotChocolate.Subscriptions.Nats.csproj
@@ -17,7 +17,7 @@
-
+
diff --git a/src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs b/src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs
index 9d27b123bfe..2abcb1896fe 100644
--- a/src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs
+++ b/src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs
@@ -1,5 +1,5 @@
-using AlterNats;
using HotChocolate.Subscriptions.Diagnostics;
+using NATS.Client.Core;
namespace HotChocolate.Subscriptions.Nats;
@@ -34,7 +34,11 @@ protected override async ValueTask OnSendAsync(
CancellationToken cancellationToken = default)
{
var serialized = _serializer.Serialize(message);
- await _connection.PublishAsync(formattedTopic, serialized).ConfigureAwait(false);
+
+ await _connection.PublishAsync(
+ formattedTopic,
+ serialized,
+ cancellationToken: cancellationToken).ConfigureAwait(false);
}
protected override async ValueTask OnCompleteAsync(string formattedTopic)
diff --git a/src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs b/src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs
index 4b4b5f51f9b..566c9bf017f 100644
--- a/src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs
+++ b/src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs
@@ -1,6 +1,6 @@
using System.Diagnostics;
-using AlterNats;
using HotChocolate.Subscriptions.Diagnostics;
+using NATS.Client.Core;
using static HotChocolate.Subscriptions.Nats.NatsResources;
namespace HotChocolate.Subscriptions.Nats;
@@ -30,13 +30,22 @@ protected override async ValueTask OnConnectAsync(
Debug.Assert(_connection != null, "_connection != null");
Debug.Assert(_connection != null, "_serializer != null");
- var natsSession = await _connection
- .SubscribeAsync(Name, (string? m) => DispatchMessage(_serializer, m))
- .ConfigureAwait(false);
+ var natsSession = Task.Run(async () =>
+ {
+ await foreach (var msg in _connection.SubscribeAsync(
+ Name,
+ cancellationToken: cancellationToken))
+ {
+ Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n");//tmp
+ DispatchMessage(_serializer, msg.Data);
+ }
+ }, cancellationToken);
DiagnosticEvents.ProviderTopicInfo(Name, NatsTopic_ConnectAsync_SubscribedToNats);
- return new Session(Name, natsSession, DiagnosticEvents);
+ return natsSession;
+
+ //return new Session(Name, natsSession, DiagnosticEvents);
}
private sealed class Session : IDisposable
diff --git a/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/HotChocolate.Subscriptions.Nats.Tests.csproj b/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/HotChocolate.Subscriptions.Nats.Tests.csproj
index b62976e04f3..f876bd55a53 100644
--- a/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/HotChocolate.Subscriptions.Nats.Tests.csproj
+++ b/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/HotChocolate.Subscriptions.Nats.Tests.csproj
@@ -22,6 +22,7 @@
+
diff --git a/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/NatsIntegrationTests.cs b/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/NatsIntegrationTests.cs
index f795cb6aaba..e59c1ccafd1 100644
--- a/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/NatsIntegrationTests.cs
+++ b/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/NatsIntegrationTests.cs
@@ -1,6 +1,6 @@
-using AlterNats;
using Microsoft.Extensions.DependencyInjection;
using HotChocolate.Execution.Configuration;
+using NATS.Client.Hosting;
using Squadron;
using Xunit.Abstractions;
diff --git a/src/HotChocolate/Core/test/Subscriptions.Tests/SubscriptionIntegrationTestBase.cs b/src/HotChocolate/Core/test/Subscriptions.Tests/SubscriptionIntegrationTestBase.cs
index 80862e56f92..fc245ae577d 100644
--- a/src/HotChocolate/Core/test/Subscriptions.Tests/SubscriptionIntegrationTestBase.cs
+++ b/src/HotChocolate/Core/test/Subscriptions.Tests/SubscriptionIntegrationTestBase.cs
@@ -33,7 +33,7 @@ public virtual async Task Subscribe_Infer_Topic()
// we need to execute the read for the subscription to start receiving.
await using var responseStream = result.ExpectResponseStream();
- var results = responseStream.ReadResultsAsync().ConfigureAwait(false);
+ var results = responseStream.ReadResultsAsync();
// assert
await sender.SendAsync("OnMessage", "bar", cts.Token);
@@ -41,7 +41,7 @@ public virtual async Task Subscribe_Infer_Topic()
var snapshot = new Snapshot();
- await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false))
+ await foreach (var response in results.WithCancellation(cts.Token))
{
snapshot.Add(response);
}
@@ -69,7 +69,7 @@ public virtual async Task Subscribe_Static_Topic()
// we need to execute the read for the subscription to start receiving.
await using var responseStream = result.ExpectResponseStream();
- var results = responseStream.ReadResultsAsync().ConfigureAwait(false);
+ var results = responseStream.ReadResultsAsync();
// assert
await sender.SendAsync("OnMessage", new Foo { Bar = "Hello" }, cts.Token);
@@ -77,7 +77,7 @@ public virtual async Task Subscribe_Static_Topic()
var snapshot = new Snapshot();
- await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false))
+ await foreach (var response in results.WithCancellation(cts.Token))
{
snapshot.Add(response);
}
@@ -107,7 +107,7 @@ public virtual async Task Subscribe_Topic_With_Arguments()
// we need to execute the read for the subscription to start receiving.
await using var responseStream = result.ExpectResponseStream();
- var results = responseStream.ReadResultsAsync().ConfigureAwait(false);
+ var results = responseStream.ReadResultsAsync();
// assert
await sender.SendAsync("OnMessage_a", "abc", cts.Token);
@@ -115,7 +115,7 @@ public virtual async Task Subscribe_Topic_With_Arguments()
var snapshot = new Snapshot();
- await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false))
+ await foreach (var response in results.WithCancellation(cts.Token))
{
snapshot.Add(response, name: "From Stream A");
}
@@ -147,10 +147,10 @@ public virtual async Task Subscribe_Topic_With_Arguments_2_Subscriber()
// we need to execute the read for the subscription to start receiving.
await using var responseStream1 = result1.ExpectResponseStream();
- var results1 = responseStream1.ReadResultsAsync().ConfigureAwait(false);
+ var results1 = responseStream1.ReadResultsAsync();
await using var responseStream2 = result2.ExpectResponseStream();
- var results2 = responseStream2.ReadResultsAsync().ConfigureAwait(false);
+ var results2 = responseStream2.ReadResultsAsync();
// assert
await sender.SendAsync("OnMessage_a", "abc", cts.Token);
@@ -158,12 +158,12 @@ public virtual async Task Subscribe_Topic_With_Arguments_2_Subscriber()
var snapshot = new Snapshot();
- await foreach (var response in results1.WithCancellation(cts.Token).ConfigureAwait(false))
+ await foreach (var response in results1.WithCancellation(cts.Token))
{
snapshot.Add(response, name: "From Stream 1");
}
- await foreach (var response in results2.WithCancellation(cts.Token).ConfigureAwait(false))
+ await foreach (var response in results2.WithCancellation(cts.Token))
{
snapshot.Add(response, name: "From Stream 2");
}
@@ -208,10 +208,10 @@ public virtual async Task Subscribe_Topic_With_Arguments_2_Topics()
// we need to execute the read for the subscription to start receiving.
await using var responseStream1 = result1.ExpectResponseStream();
- var results1 = responseStream1.ReadResultsAsync().ConfigureAwait(false);
+ var results1 = responseStream1.ReadResultsAsync();
await using var responseStream2 = result2.ExpectResponseStream();
- var results2 = responseStream2.ReadResultsAsync().ConfigureAwait(false);
+ var results2 = responseStream2.ReadResultsAsync();
// assert
await sender.SendAsync("OnMessage_a", "abc", cts.Token);
@@ -222,12 +222,12 @@ public virtual async Task Subscribe_Topic_With_Arguments_2_Topics()
var snapshot = new Snapshot();
- await foreach (var response in results1.WithCancellation(cts.Token).ConfigureAwait(false))
+ await foreach (var response in results1.WithCancellation(cts.Token))
{
snapshot.Add(response, name: "From Stream 1");
}
- await foreach (var response in results2.WithCancellation(cts.Token).ConfigureAwait(false))
+ await foreach (var response in results2.WithCancellation(cts.Token))
{
snapshot.Add(response, name: "From Stream 2");
}
@@ -268,7 +268,7 @@ public virtual async Task Subscribe_Topic_With_2_Arguments()
// we need to execute the read for the subscription to start receiving.
await using var responseStream = result.ExpectResponseStream();
- var results = responseStream.ReadResultsAsync().ConfigureAwait(false);
+ var results = responseStream.ReadResultsAsync();
// assert
await sender.SendAsync("OnMessage2_a_b", "abc", cts.Token);
@@ -276,7 +276,7 @@ public virtual async Task Subscribe_Topic_With_2_Arguments()
var snapshot = new Snapshot();
- await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false))
+ await foreach (var response in results.WithCancellation(cts.Token))
{
snapshot.Add(response, name: "From Stream A");
}
@@ -304,13 +304,13 @@ public virtual async Task Subscribe_And_Complete_Topic()
// we need to execute the read for the subscription to start receiving.
await using var responseStream = result.ExpectResponseStream();
- var results = responseStream.ReadResultsAsync().ConfigureAwait(false);
+ var results = responseStream.ReadResultsAsync();
// assert
await Task.Delay(2000, cts.Token);
await sender.CompleteAsync("OnMessage");
- await foreach (var _ in results.WithCancellation(cts.Token).ConfigureAwait(false))
+ await foreach (var _ in results.WithCancellation(cts.Token))
{
Assert.Fail("Should not have any messages.");
}
@@ -331,13 +331,13 @@ public virtual async Task Subscribe_And_Complete_Topic_With_ValueTypeMessage()
// we need to execute the read for the subscription to start receiving.
await using var responseStream = result.ExpectResponseStream();
- var results = responseStream.ReadResultsAsync().ConfigureAwait(false);
+ var results = responseStream.ReadResultsAsync();
// assert
await Task.Delay(2000, cts.Token);
await sender.CompleteAsync("OnMessage3");
- await foreach (var _ in results.WithCancellation(cts.Token).ConfigureAwait(false))
+ await foreach (var _ in results.WithCancellation(cts.Token))
{
Assert.Fail("Should not have any messages.");
}