Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated NATS library #7532

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
</PropertyGroup>

<ItemGroup>
<PackageVersion Include="AlterNats.Hosting" Version="1.0.6" />
<PackageVersion Include="Aspire.Hosting" Version="8.0.0" />
<PackageVersion Include="Aspire.Hosting.AppHost" Version="8.0.0" />
<PackageVersion Include="Aspire.Hosting.PostgreSQL" Version="8.0.0" />
Expand Down Expand Up @@ -37,6 +36,8 @@
<PackageVersion Include="Microsoft.OpenApi.Readers" Version="1.6.14" />
<PackageVersion Include="MongoDB.Driver" Version="2.29.0" />
<PackageVersion Include="Moq" Version="4.20.70" />
<PackageVersion Include="NATS.Client.Core" Version="2.3.3" />
<PackageVersion Include="NATS.Extensions.Microsoft.DependencyInjection" Version="2.3.3" />
<PackageVersion Include="NetTopologySuite" Version="2.0.0" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.2" />
<PackageVersion Include="NodaTime" Version="3.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="AlterNats.Hosting" />
<PackageReference Include="NATS.Client.Core" />
</ItemGroup>

<ItemGroup>
Expand Down
8 changes: 6 additions & 2 deletions src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using AlterNats;
using HotChocolate.Subscriptions.Diagnostics;
using NATS.Client.Core;

namespace HotChocolate.Subscriptions.Nats;

Expand Down Expand Up @@ -34,7 +34,11 @@ protected override async ValueTask OnSendAsync<TMessage>(
CancellationToken cancellationToken = default)
{
var serialized = _serializer.Serialize(message);
await _connection.PublishAsync(formattedTopic, serialized).ConfigureAwait(false);

await _connection.PublishAsync(
formattedTopic,
serialized,
cancellationToken: cancellationToken).ConfigureAwait(false);
}

protected override async ValueTask OnCompleteAsync(string formattedTopic)
Expand Down
19 changes: 14 additions & 5 deletions src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Diagnostics;
using AlterNats;
using HotChocolate.Subscriptions.Diagnostics;
using NATS.Client.Core;
using static HotChocolate.Subscriptions.Nats.NatsResources;

namespace HotChocolate.Subscriptions.Nats;
Expand All @@ -23,20 +23,29 @@
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
}

protected override async ValueTask<IDisposable> OnConnectAsync(

Check failure on line 26 in src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs

View workflow job for this annotation

GitHub Actions / Run HotChocolate.Subscriptions.Nats.Tests

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check failure on line 26 in src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs

View workflow job for this annotation

GitHub Actions / Run HotChocolate.Subscriptions.Nats.Tests

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check failure on line 26 in src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs

View workflow job for this annotation

GitHub Actions / Merge and Upload Coverage

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check failure on line 26 in src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs

View workflow job for this annotation

GitHub Actions / Merge and Upload Coverage

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check failure on line 26 in src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs

View workflow job for this annotation

GitHub Actions / Merge and Upload Coverage

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check failure on line 26 in src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs

View workflow job for this annotation

GitHub Actions / Merge and Upload Coverage

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
CancellationToken cancellationToken)
{
// We ensure that the processing is not started before the context is fully initialized.
Debug.Assert(_connection != null, "_connection != null");
Debug.Assert(_connection != null, "_serializer != null");

var natsSession = await _connection
.SubscribeAsync(Name, (string? m) => DispatchMessage(_serializer, m))
.ConfigureAwait(false);
var natsSession = Task.Run(async () =>
{
await foreach (var msg in _connection.SubscribeAsync<string?>(
Name,
cancellationToken: cancellationToken))
{
Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n");//tmp
DispatchMessage(_serializer, msg.Data);
}
}, cancellationToken);

DiagnosticEvents.ProviderTopicInfo(Name, NatsTopic_ConnectAsync_SubscribedToNats);

return new Session(Name, natsSession, DiagnosticEvents);
return natsSession;

//return new Session(Name, natsSession, DiagnosticEvents);
}

private sealed class Session : IDisposable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="NATS.Extensions.Microsoft.DependencyInjection" />
<PackageReference Include="Squadron.Nats" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using AlterNats;
using Microsoft.Extensions.DependencyInjection;
using HotChocolate.Execution.Configuration;
using NATS.Client.Hosting;
using Squadron;
using Xunit.Abstractions;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ public virtual async Task Subscribe_Infer_Topic()

// we need to execute the read for the subscription to start receiving.
await using var responseStream = result.ExpectResponseStream();
var results = responseStream.ReadResultsAsync().ConfigureAwait(false);
var results = responseStream.ReadResultsAsync();

// assert
await sender.SendAsync("OnMessage", "bar", cts.Token);
await sender.CompleteAsync("OnMessage");

var snapshot = new Snapshot();

await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false))
await foreach (var response in results.WithCancellation(cts.Token))
{
snapshot.Add(response);
}
Expand Down Expand Up @@ -70,15 +70,15 @@ public virtual async Task Subscribe_Static_Topic()

// we need to execute the read for the subscription to start receiving.
await using var responseStream = result.ExpectResponseStream();
var results = responseStream.ReadResultsAsync().ConfigureAwait(false);
var results = responseStream.ReadResultsAsync();

// assert
await sender.SendAsync("OnMessage", new Foo { Bar = "Hello", }, cts.Token);
await sender.CompleteAsync("OnMessage");

var snapshot = new Snapshot();

await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false))
await foreach (var response in results.WithCancellation(cts.Token))
{
snapshot.Add(response);
}
Expand Down Expand Up @@ -108,15 +108,15 @@ public virtual async Task Subscribe_Topic_With_Arguments()

// we need to execute the read for the subscription to start receiving.
await using var responseStream = result.ExpectResponseStream();
var results = responseStream.ReadResultsAsync().ConfigureAwait(false);
var results = responseStream.ReadResultsAsync();

// assert
await sender.SendAsync("OnMessage_a", "abc", cts.Token);
await sender.CompleteAsync("OnMessage_a");

var snapshot = new Snapshot();

await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false))
await foreach (var response in results.WithCancellation(cts.Token))
{
snapshot.Add(response, name: "From Stream A");
}
Expand Down Expand Up @@ -148,23 +148,23 @@ public virtual async Task Subscribe_Topic_With_Arguments_2_Subscriber()

// we need to execute the read for the subscription to start receiving.
await using var responseStream1 = result1.ExpectResponseStream();
var results1 = responseStream1.ReadResultsAsync().ConfigureAwait(false);
var results1 = responseStream1.ReadResultsAsync();

await using var responseStream2 = result2.ExpectResponseStream();
var results2 = responseStream2.ReadResultsAsync().ConfigureAwait(false);
var results2 = responseStream2.ReadResultsAsync();

// assert
await sender.SendAsync("OnMessage_a", "abc", cts.Token);
await sender.CompleteAsync("OnMessage_a");

var snapshot = new Snapshot();

await foreach (var response in results1.WithCancellation(cts.Token).ConfigureAwait(false))
await foreach (var response in results1.WithCancellation(cts.Token))
{
snapshot.Add(response, name: "From Stream 1");
}

await foreach (var response in results2.WithCancellation(cts.Token).ConfigureAwait(false))
await foreach (var response in results2.WithCancellation(cts.Token))
{
snapshot.Add(response, name: "From Stream 2");
}
Expand Down Expand Up @@ -209,10 +209,10 @@ public virtual async Task Subscribe_Topic_With_Arguments_2_Topics()

// we need to execute the read for the subscription to start receiving.
await using var responseStream1 = result1.ExpectResponseStream();
var results1 = responseStream1.ReadResultsAsync().ConfigureAwait(false);
var results1 = responseStream1.ReadResultsAsync();

await using var responseStream2 = result2.ExpectResponseStream();
var results2 = responseStream2.ReadResultsAsync().ConfigureAwait(false);
var results2 = responseStream2.ReadResultsAsync();

// assert
await sender.SendAsync("OnMessage_a", "abc", cts.Token);
Expand All @@ -223,12 +223,12 @@ public virtual async Task Subscribe_Topic_With_Arguments_2_Topics()

var snapshot = new Snapshot();

await foreach (var response in results1.WithCancellation(cts.Token).ConfigureAwait(false))
await foreach (var response in results1.WithCancellation(cts.Token))
{
snapshot.Add(response, name: "From Stream 1");
}

await foreach (var response in results2.WithCancellation(cts.Token).ConfigureAwait(false))
await foreach (var response in results2.WithCancellation(cts.Token))
{
snapshot.Add(response, name: "From Stream 2");
}
Expand Down Expand Up @@ -269,15 +269,15 @@ public virtual async Task Subscribe_Topic_With_2_Arguments()

// we need to execute the read for the subscription to start receiving.
await using var responseStream = result.ExpectResponseStream();
var results = responseStream.ReadResultsAsync().ConfigureAwait(false);
var results = responseStream.ReadResultsAsync();

// assert
await sender.SendAsync("OnMessage2_a_b", "abc", cts.Token);
await sender.CompleteAsync("OnMessage2_a_b");

var snapshot = new Snapshot();

await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false))
await foreach (var response in results.WithCancellation(cts.Token))
{
snapshot.Add(response, name: "From Stream A");
}
Expand Down Expand Up @@ -305,13 +305,13 @@ public virtual async Task Subscribe_And_Complete_Topic()

// we need to execute the read for the subscription to start receiving.
await using var responseStream = result.ExpectResponseStream();
var results = responseStream.ReadResultsAsync().ConfigureAwait(false);
var results = responseStream.ReadResultsAsync();

// assert
await Task.Delay(2000, cts.Token);
await sender.CompleteAsync("OnMessage");

await foreach (var unused in results.WithCancellation(cts.Token).ConfigureAwait(false))
await foreach (var unused in results.WithCancellation(cts.Token))
{
Assert.Fail("Should not have any messages.");
}
Expand All @@ -332,13 +332,13 @@ public virtual async Task Subscribe_And_Complete_Topic_With_ValueTypeMessage()

// we need to execute the read for the subscription to start receiving.
await using var responseStream = result.ExpectResponseStream();
var results = responseStream.ReadResultsAsync().ConfigureAwait(false);
var results = responseStream.ReadResultsAsync();

// assert
await Task.Delay(2000, cts.Token);
await sender.CompleteAsync("OnMessage3");

await foreach (var unused in results.WithCancellation(cts.Token).ConfigureAwait(false))
await foreach (var unused in results.WithCancellation(cts.Token))
{
Assert.Fail("Should not have any messages.");
}
Expand Down
Loading