Skip to content

Commit

Permalink
Merge branch 'release/9.1.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Jericho committed Feb 24, 2024
2 parents f404a6e + 384db20 commit 5443534
Show file tree
Hide file tree
Showing 21 changed files with 404 additions and 293 deletions.
159 changes: 103 additions & 56 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

## About

Picton.Messaging is a C# library containing a high performance message processor (also known as a message "pump") designed to process messages from an Azure storage queue as efficiently as possible.
Picton.Messaging is a C# library containing a high performance message processor (also known as a message "pump") designed to process messages from Azure storage queues as efficiently as possible.

I created Picton.Mesaging because I needed a way to process a large volume of messages from Azure storage queues as quickly and efficiently as possible. I searched for a long time, but I could never find a solution that met all my requirements.

Expand All @@ -27,6 +27,8 @@ The sample code that Daniel shared during his webinars was very generic and not

In December 2017 version 2.0 was released with a much more efficient method of fetching messages from the Azure queue: there is now a dedicated task for this pupose instead of allowing each individual concurent task to fetch their own messages. This means that the logic to increase/decrease the number of available slots in the SemaphoreSlim is no longer necessary and has ben removed.

In January 2024 version 9.0 was released with two major new features: the message pump is now able to monitor multiple queues and also a specialized version of the message pump was added to monitor queues that follow a naming convention. Additionaly, this specialized message pump queries the Azure storage at regular interval to detect if new queues have been created. This is, in my opinion, an ideal solution when you have a multi-tenant solution with one queue for each tenant.


## Nuget

Expand All @@ -43,71 +45,41 @@ The easiest way to include Picton.Messaging in your C# project is by grabing the
PM> Install-Package Picton.Messaging
```

Once you have the Picton.Messaging library properly referenced in your project, modify your RoleEntryPoint like this example:

```csharp
using Microsoft.WindowsAzure.ServiceRuntime;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.RetryPolicies;
using Picton.Messaging;
using System;
using System.Diagnostics;

namespace WorkerRole1
{
public class MyWorkerRole : RoleEntryPoint
{
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
## How to use

public override void Run()
{
Trace.TraceInformation("WorkerRole is running");
Once you have the Picton.Messaging library properly referenced in your project, you need to following two CSharp files:

try
{
this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
finally
{
this.runCompleteEvent.Set();
}
}
### Program.cs

public override bool OnStart()
{
// Use TLS 1.2 for Service Bus connections
ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12;
```csharp
using WorkerService1;

// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddHostedService<Worker>();

// For information on handling configuration changes
// see the MSDN topic at https://go.microsoft.com/fwlink/?LinkId=166357.
var host = builder.Build();
host.Run();
```

bool result = base.OnStart();
### Worker.cs

Trace.TraceInformation("WorkerRole has been started");
```csharp
using Picton.Messaging;

return result;
}
namespace WorkerService1
{
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;

public override void OnStop()
public Worker(ILogger<Worker> logger)
{
Trace.TraceInformation("WorkerRole is stopping");

// Invoking "Cancel()" will cause the AsyncMessagePump to stop
this.cancellationTokenSource.Cancel();
this.runCompleteEvent.WaitOne();

base.OnStop();

Trace.TraceInformation("WorkerRole has stopped");
_logger = logger;
}

private async Task RunAsync(CancellationToken cancellationToken)
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var connectionString = "<-- insert connection string for your Azure account -->";
var connectionString = "<-- connection string for your Azure storage account -->";
var concurrentTask = 10; // <-- this is the max number of messages that can be processed at a time
// Configure the message pump
Expand Down Expand Up @@ -153,8 +125,83 @@ namespace WorkerRole1
messagePump.AddQueue("queue06", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob");
messagePump.AddQueue("queue07", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob");

// Start the message pump
await messagePump.StartAsync(cancellationToken);
// You can add all queues matching a given RegEx pattern
await messagePump.AddQueuesByPatternAsync("myqueue*", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob", cancellationToken).ConfigureAwait(false);

// Start the mesage pump (the token is particularly important because that's how the message pump will be notified when the worker is stopping)
await messagePump.StartAsync(stoppingToken).ConfigureAwait(false);
}
}
}
```

## Message handlers

As demonstrated in the previous code sample, you can define your own `OnMessage` delegate which gets invoked by the message pump when each message is processed. This is perfect for simple scenarios where your C# logic is rather simple. However, your C# code can become complicated pretty quickly as the complexity of your logic increases.

The Picton.Messaging library includes a more advanced message pump that can simplify this situation for you: `AsyncMessagePumpWithHandlers`. You C# project must define so-called "handlers" for each of your message types. These handlers are simply c# classes that implement the `IMessageHandler<T>` interface, where `T` is the type of the message. For example, if you expect to process messages of type `MyMessage`, you must define a class with signature similar to this: `public class MyMessageHandler : IMessageHandler<MyMessage>`.

Once you have created all the handlers you need, you must register them with your solution's DI service collection like in this example:

### Program.cs

```csharp
using Picton.Messaging;
using WorkerService1;

var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddHostedService<Worker>();

/*
You can either register your message handlers one by one like this:
builder.Services.AddSingleton<IMessageHandler<MyMessage>, MyMessageHandler>()
builder.Services.AddSingleton<IMessageHandler<MyOtherMessage>, MyOtherMessageHandler>()
builder.Services.AddSingleton<IMessageHandler<AnotherMessage>, AnotherMessageHandler>()
*/

// Or you can allow Picton.Messaging to scan your assemblies and to register all message handlers like this:
builder.Services.AddPictonMessageHandlers()

var host = builder.Build();
host.Run();
```

### Worker.cs

```csharp
using Picton.Messaging;

namespace WorkerService1
{
public class Worker : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<Worker> _logger;

public Worker(IServiceProvider serviceProvider, ILogger<Worker> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var connectionString = "<-- connection string for your Azure storage account -->";
var concurrentTask = 10; // <-- this is the max number of messages that can be processed at a time
var options = new MessagePumpOptions(connectionString, concurrentTasks, null, null);
var messagePump = new AsyncMessagePumpWithHandlers(options, _serviceProvider, _logger)
{
OnError = (queueName, message, exception, isPoison) =>
{
// Insert your custom error handling
}
};

messagePump.AddQueue("myqueue", null, TimeSpan.FromMinutes(1), 3);

await messagePump.StartAsync(stoppingToken).ConfigureAwait(false);
}
}
}
Expand Down
13 changes: 9 additions & 4 deletions Source/Picton.Messaging.IntegrationTests/MyMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
using Microsoft.Extensions.Logging;
using Picton.Messaging.Messages;
using System.Threading;
using System.Threading.Tasks;

namespace Picton.Messaging.IntegrationTests
{
public class MyMessageHandler : IMessageHandler<MyMessage>
{
private readonly ILogger _log;
private readonly ILogger<MyMessageHandler> _log;

public MyMessageHandler(ILogger log)
public MyMessageHandler(ILogger<MyMessageHandler> log)
{
_log = log;
}
public void Handle(MyMessage message)

public Task HandleAsync(MyMessage message, CancellationToken cancellationToken)
{
_log.LogInformation(message.MessageContent);
_log?.LogInformation(message.MessageContent);

return Task.CompletedTask;
}
}
}
1 change: 1 addition & 0 deletions Source/Picton.Messaging.IntegrationTests/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ private static void ConfigureServices(ServiceCollection services)
{
services
.AddLogging(loggingBuilder => loggingBuilder.AddNLog(GetNLogConfiguration()))
.AddPictonMessageHandlers()
.AddTransient<TestsRunner>();
}

Expand Down
38 changes: 20 additions & 18 deletions Source/Picton.Messaging.IntegrationTests/TestsRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
using System;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;

namespace Picton.Messaging.IntegrationTests
{
internal class TestsRunner(ILogger<TestsRunner> logger)
internal class TestsRunner
{
private enum ResultCodes
{
Expand All @@ -19,10 +20,20 @@ private enum ResultCodes
Cancelled = 1223
}

private readonly ILogger _logger = logger;
private readonly ILogger<TestsRunner> _logger;
private readonly IServiceProvider _serviceProvider;

public TestsRunner(ILogger<TestsRunner> logger, IServiceProvider serviceProvider)
{
_logger = logger;
_serviceProvider = serviceProvider;
}

public async Task<int> RunAsync()
{
ServicePointManager.DefaultConnectionLimit = 1000;
ServicePointManager.UseNagleAlgorithm = false;

// Configure Console
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (s, e) =>
Expand Down Expand Up @@ -114,13 +125,10 @@ private async Task RunAsyncMessagePumpTests(string connectionString, string queu
_logger.LogInformation("{messageContent}", message.Content.ToString());
},

// Stop the timer and the message pump when the queue is empty.
OnEmpty = cancellationToken =>
// Stop the message pump when there are no more messages to process.
OnAllQueuesEmpty = cancellationToken =>
{
// Stop the timer
if (sw.IsRunning) sw.Stop();

// Stop the message pump
_logger.LogDebug("Asking the message pump to stop...");
cts.Cancel();
}
Expand Down Expand Up @@ -156,15 +164,12 @@ private async Task RunAsyncMessagePumpWithHandlersTests(string connectionString,
Stopwatch sw = null;
var cts = new CancellationTokenSource();
var options = new MessagePumpOptions(connectionString, concurrentTasks, null, null);
var messagePump = new AsyncMessagePumpWithHandlers(options, _logger, metrics)
var messagePump = new AsyncMessagePumpWithHandlers(options, _serviceProvider, _logger, metrics)
{
// Stop the timer and the message pump when the queue is empty.
OnEmpty = cancellationToken =>
// Stop the message pump when there are no more messages to process.
OnAllQueuesEmpty = cancellationToken =>
{
// Stop the timer
if (sw.IsRunning) sw.Stop();

// Stop the message pump
_logger.LogDebug("Asking the message pump with handlers to stop...");
cts.Cancel();
}
Expand Down Expand Up @@ -211,13 +216,10 @@ private async Task RunMultiTenantAsyncMessagePumpTests(string connectionString,
_logger.LogInformation("{tenantId} - {messageContent}", tenantId, message.Content.ToString());
},

// Stop the timer and the message pump when all tenant queues are empty.
OnEmpty = cancellationToken =>
// Stop the message pump when there are no more messages to process.
OnAllQueuesEmpty = cancellationToken =>
{
// Stop the timer
if (sw.IsRunning) sw.Stop();

// Stop the message pump
_logger.LogDebug("Asking the multi-tenant message pump to stop...");
cts.Cancel();
}
Expand Down
12 changes: 6 additions & 6 deletions Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public async Task No_message_processed_when_queue_is_empty()
{
Interlocked.Increment(ref onErrorInvokeCount);
},
OnEmpty = cancellationToken =>
OnAllQueuesEmpty = cancellationToken =>
{
Interlocked.Increment(ref OnEmptyInvokeCount);
cts.Cancel();
Expand Down Expand Up @@ -212,7 +212,7 @@ public async Task Message_processed()
}
}
},
OnEmpty = cancellationToken =>
OnAllQueuesEmpty = cancellationToken =>
{
Interlocked.Increment(ref OnEmptyInvokeCount);
cts.Cancel();
Expand Down Expand Up @@ -299,7 +299,7 @@ public async Task Poison_message_is_rejected()
{
Interlocked.Increment(ref onErrorInvokeCount);
},
OnEmpty = cancellationToken =>
OnAllQueuesEmpty = cancellationToken =>
{
Interlocked.Increment(ref OnEmptyInvokeCount);
cts.Cancel();
Expand Down Expand Up @@ -397,7 +397,7 @@ public async Task Poison_message_is_moved()
{
Interlocked.Increment(ref onErrorInvokeCount);
},
OnEmpty = cancellationToken =>
OnAllQueuesEmpty = cancellationToken =>
{
Interlocked.Increment(ref OnEmptyInvokeCount);
cts.Cancel();
Expand Down Expand Up @@ -455,7 +455,7 @@ public async Task Exceptions_in_OnEmpty_are_ignored()
{
Interlocked.Increment(ref onErrorInvokeCount);
},
OnEmpty = cancellationToken =>
OnAllQueuesEmpty = cancellationToken =>
{
Interlocked.Increment(ref OnEmptyInvokeCount);

Expand Down Expand Up @@ -551,7 +551,7 @@ public async Task Exceptions_in_OnError_are_ignored()
Interlocked.Increment(ref onErrorInvokeCount);
throw new Exception("This dummy exception should be ignored");
},
OnEmpty = cancellationToken =>
OnAllQueuesEmpty = cancellationToken =>
{
Interlocked.Increment(ref OnEmptyInvokeCount);
cts.Cancel();
Expand Down
2 changes: 1 addition & 1 deletion Source/Picton.Messaging.UnitTests/ExtensionsTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Shouldly;
using Shouldly;
using System;
using Xunit;

Expand Down
Loading

0 comments on commit 5443534

Please sign in to comment.