Skip to content

Commit

Permalink
add NBB.Messaging.JetStream (#265)
Browse files Browse the repository at this point in the history
* add NBB.Messaging.JetStream
  • Loading branch information
lghinet authored Oct 19, 2023
1 parent 950d015 commit 49ab024
Show file tree
Hide file tree
Showing 8 changed files with 627 additions and 1 deletion.
9 changes: 8 additions & 1 deletion NBB.sln
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NBB.Core.Configuration", "s
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NBB.Core.Configuration.Tests", "test\UnitTests\Core\NBB.Core.Configuration.Tests\NBB.Core.Configuration.Tests.csproj", "{2A91E54D-F63C-4E92-9A6D-F15C7C55B0D7}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NBB.Messaging.Rusi.IntegrationTests", "test\Integration\NBB.Messaging.Rusi.IntegrationTests\NBB.Messaging.Rusi.IntegrationTests.csproj", "{2D002428-8D55-46BD-A588-84946E33B4EF}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NBB.Messaging.Rusi.IntegrationTests", "test\Integration\NBB.Messaging.Rusi.IntegrationTests\NBB.Messaging.Rusi.IntegrationTests.csproj", "{2D002428-8D55-46BD-A588-84946E33B4EF}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NBB.Messaging.JetStream", "src\Messaging\NBB.Messaging.JetStream\NBB.Messaging.JetStream.csproj", "{C609385D-A1BD-4411-A675-488CA66AEA8F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -913,6 +915,10 @@ Global
{2D002428-8D55-46BD-A588-84946E33B4EF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2D002428-8D55-46BD-A588-84946E33B4EF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2D002428-8D55-46BD-A588-84946E33B4EF}.Release|Any CPU.Build.0 = Release|Any CPU
{C609385D-A1BD-4411-A675-488CA66AEA8F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C609385D-A1BD-4411-A675-488CA66AEA8F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C609385D-A1BD-4411-A675-488CA66AEA8F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C609385D-A1BD-4411-A675-488CA66AEA8F}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1081,6 +1087,7 @@ Global
{2C7CA0BD-98AD-4CC1-B496-94ED49636466} = {14726095-DA28-43A6-A9A9-F16C605932E1}
{2A91E54D-F63C-4E92-9A6D-F15C7C55B0D7} = {29B7593C-60F4-41DC-A883-4976FF467927}
{2D002428-8D55-46BD-A588-84946E33B4EF} = {2D9089E7-54EA-4526-B8FE-12729BF14F16}
{C609385D-A1BD-4411-A675-488CA66AEA8F} = {584C62C0-2AE6-4DD6-9BCF-8FF28B7122CE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {23A42379-616A-43EF-99BC-803DF151F54E}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) TotalSoft.
// This source code is licensed under the MIT license.

using Microsoft.Extensions.Configuration;
using NBB.Messaging.Abstractions;
using NBB.Messaging.JetStream;
using NBB.Messaging.JetStream.Internal;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class DependencyInjectionExtensions
{
public static IServiceCollection AddJetStreamTransport(this IServiceCollection services, IConfiguration configuration)
{
services.Configure<JetStreamOptions>(configuration.GetSection("Messaging").GetSection("JetStream"));
services.AddSingleton<JetStreamConnectionProvider>();
services.AddSingleton<IMessagingTransport, JetStreamMessagingTransport>();
services.AddSingleton<ITransportMonitor>(sp => sp.GetRequiredService<JetStreamConnectionProvider>());

return services;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright (c) TotalSoft.
// This source code is licensed under the MIT license.

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NATS.Client;
using NBB.Messaging.Abstractions;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace NBB.Messaging.JetStream.Internal
{
public class JetStreamConnectionProvider : IDisposable, ITransportMonitor
{
private readonly IOptions<JetStreamOptions> _natsOptions;
private readonly ILogger<JetStreamConnectionProvider> _logger;
private IConnection _connection;
private static readonly object InstanceLoker = new();
private Exception _unrecoverableException;

public event TransportErrorHandler OnError;

public JetStreamConnectionProvider(IOptions<JetStreamOptions> natsOptions, ILogger<JetStreamConnectionProvider> logger)
{
_natsOptions = natsOptions;
_logger = logger;
}

public async Task ExecuteAsync(Func<IConnection, Task> action)
{
var connection = GetAndCheckConnection();

await action(connection);
}

public void Execute(Action<IConnection> action)
{
var connection = GetAndCheckConnection();

action(connection);
}

private IConnection GetAndCheckConnection()
{
if (_connection == null)
lock (InstanceLoker)
{
if (_connection == null)
_connection = CreateConnection();
}
return _connection;
}

private IConnection CreateConnection()
{
var options = ConnectionFactory.GetDefaultOptions();
options.Url = _natsOptions.Value.NatsUrl;

//https://github.com/nats-io/nats.net/issues/804
options.AllowReconnect = false;

options.ClosedEventHandler += (_, args) =>
{
SetConnectionLostState(args.Error ?? new Exception("NATS Jetstream connection was lost"));
};

_connection = new ConnectionFactory().CreateConnection(options);
_logger.LogInformation($"NATS Jetstream connection to {_natsOptions.Value.NatsUrl} was established");

return _connection;
}

private void SetConnectionLostState(Exception exception)
{
_connection = null;

// Set the field to the current exception if not already set
var existingException = Interlocked.CompareExchange(ref _unrecoverableException, exception, null);

// Send the application stop signal only once
if (existingException != null)
return;

_logger.LogError(exception, "NATS Jetstream connection unrecoverable");

OnError?.Invoke(exception);
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
if (disposing)
{
_connection?.Dispose();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) TotalSoft.
// This source code is licensed under the MIT license.

using Microsoft.Extensions.Options;
using NATS.Client;
using NATS.Client.JetStream;
using NBB.Messaging.Abstractions;
using NBB.Messaging.JetStream.Internal;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace NBB.Messaging.JetStream
{
public class JetStreamMessagingTransport : IMessagingTransport
{
private readonly IOptions<JetStreamOptions> _natsOptions;
private readonly JetStreamConnectionProvider _natsConnectionManager;

public JetStreamMessagingTransport(IOptions<JetStreamOptions> natsOptions, JetStreamConnectionProvider natsConnectionManager)
{
_natsOptions = natsOptions;
_natsConnectionManager = natsConnectionManager;
}

public Task PublishAsync(string topic, TransportSendContext sendContext, CancellationToken cancellationToken = default)
{
var envelopeData = sendContext.EnvelopeBytesAccessor.Invoke();

return _natsConnectionManager.ExecuteAsync(con =>
{
IJetStream js = con.CreateJetStreamContext();
return js.PublishAsync(topic, envelopeData);
});
}

public Task<IDisposable> SubscribeAsync(string topic,
Func<TransportReceiveContext, Task> handler,
SubscriptionTransportOptions options = null,
CancellationToken cancellationToken = default)
{

IDisposable consumer = null;

_natsConnectionManager.Execute(con =>
{
IJetStream js = con.CreateJetStreamContext();

// set's up the stream
var isCommand = topic.ToLower().Contains("commands.");

var stream = isCommand ? _natsOptions.Value.CommandsStream : _natsOptions.Value.EventsStream;
var jsm = con.CreateJetStreamManagementContext();
jsm.GetStreamInfo(stream);

// get stream context, create consumer and get the consumer context
var streamContext = con.GetStreamContext(stream);

var subscriberOptions = options ?? SubscriptionTransportOptions.Default;
var ccb = ConsumerConfiguration.Builder();

if (subscriberOptions.IsDurable)
{
var clientId = (_natsOptions.Value.ClientId + topic).Replace(".", "_");
ccb.WithDurable(clientId);
}

if (subscriberOptions.DeliverNewMessagesOnly)
ccb.WithDeliverPolicy(DeliverPolicy.New);
else
ccb.WithDeliverPolicy(DeliverPolicy.All);

ccb.WithAckWait(subscriberOptions.AckWait ?? _natsOptions.Value.AckWait ?? 50000);

//https://docs.nats.io/nats-concepts/jetstream/consumers#maxackpending
ccb.WithMaxAckPending(subscriberOptions.MaxConcurrentMessages);
ccb.WithFilterSubject(topic);

var consumerContext = streamContext.CreateOrUpdateConsumer(ccb.Build());

void NatsMsgHandler(object obj, MsgHandlerEventArgs args)
{
if (cancellationToken.IsCancellationRequested)
return;

var receiveContext = new TransportReceiveContext(new TransportReceivedData.EnvelopeBytes(args.Message.Data));

// Fire and forget
_ = handler(receiveContext).ContinueWith(_ => args.Message.Ack(), cancellationToken);
}
consumer = consumerContext.Consume(NatsMsgHandler);

});
return Task.FromResult(consumer);
}
}
}
26 changes: 26 additions & 0 deletions src/Messaging/NBB.Messaging.JetStream/JetStreamOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) TotalSoft.
// This source code is licensed under the MIT license.

namespace NBB.Messaging.JetStream
{
public class JetStreamOptions
{
/// <summary>
/// URL of the Streaming NATS cluster
/// </summary>
public string NatsUrl { get; set; }

/// <summary>
/// Identifier of the Streaming NATS client
/// </summary>
public string ClientId { get; set; }

/// <summary>
/// The time the server awaits for acknowledgement from the client before redelivering the message (in milliseconds)
/// </summary>
public int? AckWait { get; set; }
public string CommandsStream { get; set; }
public string EventsStream { get; set; }

}
}
Loading

0 comments on commit 49ab024

Please sign in to comment.