Skip to content

Commit

Permalink
Merge branch 'release/3.1.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Jericho committed May 8, 2018
2 parents 32820d6 + df6c810 commit 3f34bb2
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 52 deletions.
11 changes: 7 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
### VisualStudio ###
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
##
## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore

# User-specific files
*.suo
Expand Down Expand Up @@ -273,9 +275,6 @@ paket-files/
# CodeRush
.cr/

# WinMerge
*.bak

# Python Tools for Visual Studio (PTVS)
__pycache__/
*.pyc
Expand All @@ -297,4 +296,8 @@ tools/**
# By default, sensitive information, such as encrypted password
# should be stored in the .pubxml.user file.

# End of https://www.gitignore.io/api/visualstudio

# WinMerge
*.bak

# End of https://www.gitignore.io/api/visualstudio
2 changes: 1 addition & 1 deletion GitReleaseManager.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
create:
include-footer: false
footer-heading: Where to get it
footer-content: You can download this release from [nuget.org](https://www.nuget.org/packages/Picton/{milestone})
footer-content: You can download this release from [nuget.org](https://www.nuget.org/packages/Picton.Messaging/{milestone})
footer-includes-milestone: true
milestone-replace-text: '{milestone}'
export:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="App.Metrics.Reporting.Http" Version="2.0.0-preview1" />
<PackageReference Include="App.Metrics.Reporting.Http" Version="2.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
4 changes: 2 additions & 2 deletions Source/Picton.Messaging.IntegrationTests/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public static void ProcessSimpleMessages(string queueName, CloudStorageAccount s
Stopwatch sw = null;

// Configure the message pump
var messagePump = new AsyncMessagePump(queueName, storageAccount, 10, TimeSpan.FromMinutes(1), 3, metrics)
var messagePump = new AsyncMessagePump(queueName, storageAccount, 10, null, TimeSpan.FromMinutes(1), 3, metrics)
{
OnMessage = (message, cancellationToken) =>
{
Expand Down Expand Up @@ -157,7 +157,7 @@ public static void ProcessMessagesWithHandlers(string queueName, CloudStorageAcc
Stopwatch sw = null;

// Configure the message pump
var messagePump = new AsyncMessagePumpWithHandlers(queueName, storageAccount, 10, TimeSpan.FromMinutes(1), 3, metrics);
var messagePump = new AsyncMessagePumpWithHandlers(queueName, storageAccount, 10, null, TimeSpan.FromMinutes(1), 3, metrics);
messagePump.OnQueueEmpty = cancellationToken =>
{
// Stop the timer
Expand Down
131 changes: 116 additions & 15 deletions Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Moq;
using Shouldly;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
Expand All @@ -24,7 +25,7 @@ public void Null_cloudQueue_throws()
{
Should.Throw<ArgumentNullException>(() =>
{
var messagePump = new AsyncMessagePump("myqueue", (CloudStorageAccount)null, 1, TimeSpan.FromMinutes(1), 3);
var messagePump = new AsyncMessagePump("myqueue", (CloudStorageAccount)null, 1, null, TimeSpan.FromMinutes(1), 3);
});
}

Expand All @@ -34,7 +35,7 @@ public void Number_of_concurrent_tasks_too_small_throws()
Should.Throw<ArgumentException>(() =>
{
var mockStorageAccount = GetMockStorageAccount(null, null);
var messagePump = new AsyncMessagePump("myqueue", mockStorageAccount.Object, 0, TimeSpan.FromMinutes(1), 3);
var messagePump = new AsyncMessagePump("myqueue", mockStorageAccount.Object, 0, null, TimeSpan.FromMinutes(1), 3);
});
}

Expand All @@ -44,7 +45,7 @@ public void DequeueCount_too_small_throws()
Should.Throw<ArgumentException>(() =>
{
var mockStorageAccount = GetMockStorageAccount(null, null);
var messagePump = new AsyncMessagePump("myqueue", mockStorageAccount.Object, 1, TimeSpan.FromMinutes(1), 0);
var messagePump = new AsyncMessagePump("myqueue", mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 0);
});
}

Expand All @@ -59,7 +60,7 @@ public void Start_without_OnMessage_throws()
var mockBlobClient = GetMockBlobClient(mockBlobContainer);
var mockStorageAccount = GetMockStorageAccount(mockBlobClient, mockQueueClient);

var messagePump = new AsyncMessagePump("myqueue", mockStorageAccount.Object, 1, TimeSpan.FromMinutes(1), 3);
var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 3);

// Act
Should.Throw<ArgumentNullException>(() => messagePump.Start());
Expand All @@ -76,7 +77,7 @@ public void Stopping_without_starting()
var mockBlobClient = GetMockBlobClient(mockBlobContainer);
var mockStorageAccount = GetMockStorageAccount(mockBlobClient, mockQueueClient);

var messagePump = new AsyncMessagePump("myqueue", mockStorageAccount.Object, 1, TimeSpan.FromMinutes(1), 3);
var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 3);

// Act
messagePump.Stop();
Expand All @@ -102,7 +103,7 @@ public void No_message_processed_when_queue_is_empty()

mockQueue.Setup(q => q.GetMessagesAsync(It.IsAny<int>(), It.IsAny<TimeSpan?>(), It.IsAny<QueueRequestOptions>(), It.IsAny<OperationContext>(), It.IsAny<CancellationToken>())).ReturnsAsync(Enumerable.Empty<CloudQueueMessage>());

var messagePump = new AsyncMessagePump("myqueue", mockStorageAccount.Object, 1, TimeSpan.FromMinutes(1), 3);
var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 3);
messagePump.OnMessage = (message, cancellationToken) =>
{
Interlocked.Increment(ref onMessageInvokeCount);
Expand Down Expand Up @@ -180,7 +181,7 @@ public void Message_processed()
return Task.FromResult(true);
});

var messagePump = new AsyncMessagePump("myqueue", mockStorageAccount.Object, 1, TimeSpan.FromMinutes(1), 3);
var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 3);
messagePump.OnMessage = (message, cancellationToken) =>
{
Interlocked.Increment(ref onMessageInvokeCount);
Expand Down Expand Up @@ -260,7 +261,7 @@ public void Poison_message_is_rejected()
return Task.FromResult(true);
});

var messagePump = new AsyncMessagePump("myqueue", mockStorageAccount.Object, 1, TimeSpan.FromMinutes(1), retries);
var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), retries);
messagePump.OnMessage = (message, cancellationToken) =>
{
Interlocked.Increment(ref onMessageInvokeCount);
Expand Down Expand Up @@ -289,13 +290,105 @@ public void Poison_message_is_rejected()

// Assert
onMessageInvokeCount.ShouldBe(1);
onQueueEmptyInvokeCount.ShouldBeGreaterThan(0);
onQueueEmptyInvokeCount.ShouldBe(1);
onErrorInvokeCount.ShouldBe(1);
isRejected.ShouldBeTrue();
mockQueue.Verify(q => q.GetMessagesAsync(It.IsAny<int>(), It.IsAny<TimeSpan?>(), It.IsAny<QueueRequestOptions>(), It.IsAny<OperationContext>(), It.IsAny<CancellationToken>()), Times.AtLeast(2));
mockQueue.Verify(q => q.DeleteMessageAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<QueueRequestOptions>(), It.IsAny<OperationContext>(), It.IsAny<CancellationToken>()), Times.Exactly(1));
}

[Fact]
public void Poison_message_is_moved()
{
// Arrange
var onMessageInvokeCount = 0;
var onQueueEmptyInvokeCount = 0;
var onErrorInvokeCount = 0;

var isRejected = false;
var retries = 3;
var lockObject = new Object();
var cloudMessage = new CloudQueueMessage("Message");

var queueName = "myqueue";
var poisonQueueName = $"{queueName}-poison";

var mockQueue = GetMockQueue(queueName);
var mockPoisonQueue = GetMockQueue(poisonQueueName);
var mockQueueClient = GetMockQueueClient(new[] { mockQueue, mockPoisonQueue });
var mockBlobContainer = GetMockBlobContainer();
var mockBlobClient = GetMockBlobClient(mockBlobContainer);
var mockStorageAccount = GetMockStorageAccount(mockBlobClient, mockQueueClient);

mockQueue.Setup(q => q.GetMessagesAsync(It.IsAny<int>(), It.IsAny<TimeSpan?>(), It.IsAny<QueueRequestOptions>(), It.IsAny<OperationContext>(), It.IsAny<CancellationToken>())).ReturnsAsync((int messageCount, TimeSpan? visibilityTimeout, QueueRequestOptions options, OperationContext operationContext, CancellationToken cancellationToken) =>
{
if (cloudMessage != null)
{
lock (lockObject)
{
if (cloudMessage != null)
{
// DequeueCount is a private property. Therefore we must use reflection to change its value
var dequeueCountProperty = cloudMessage.GetType().GetProperty("DequeueCount");
dequeueCountProperty.SetValue(cloudMessage, retries + 1); // intentionally set 'DequeueCount' to a value exceeding maxRetries to simulate a poison message

return new[] { cloudMessage };
}
}
}
return Enumerable.Empty<CloudQueueMessage>();
});
mockQueue.Setup(q => q.DeleteMessageAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<QueueRequestOptions>(), It.IsAny<OperationContext>(), It.IsAny<CancellationToken>())).Returns((string messageId, string popReceipt, QueueRequestOptions options, OperationContext operationContext, CancellationToken cancellationToken) =>
{
lock (lockObject)
{
cloudMessage = null;
}
return Task.FromResult(true);
});
mockPoisonQueue.Setup(q => q.AddMessageAsync(It.IsAny<CloudQueueMessage>(), It.IsAny<TimeSpan?>(), It.IsAny<TimeSpan?>(), It.IsAny<QueueRequestOptions>(), It.IsAny<OperationContext>(), It.IsAny<CancellationToken>())).Returns((CloudQueueMessage message, TimeSpan? timeToLive, TimeSpan? visibilityTimeout, QueueRequestOptions options, OperationContext operationContext, CancellationToken cancellationToken) =>
{
// Nothing to do. We just want to ensure this method is invoked.
return Task.FromResult(true);
});

var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, poisonQueueName, TimeSpan.FromMinutes(1), retries);
messagePump.OnMessage = (message, cancellationToken) =>
{
Interlocked.Increment(ref onMessageInvokeCount);
throw new Exception("An error occured when attempting to process the message");
};
messagePump.OnQueueEmpty = cancellationToken =>
{
Interlocked.Increment(ref onQueueEmptyInvokeCount);
messagePump.Stop();
};
messagePump.OnError = (message, exception, isPoison) =>
{
Interlocked.Increment(ref onErrorInvokeCount);
if (isPoison)
{
lock (lockObject)
{
isRejected = true;
cloudMessage = null;
}
}
};

// Act
messagePump.Start();

// Assert
onMessageInvokeCount.ShouldBe(1);
onQueueEmptyInvokeCount.ShouldBe(1);
onErrorInvokeCount.ShouldBe(1);
isRejected.ShouldBeTrue();
mockQueue.Verify(q => q.GetMessagesAsync(It.IsAny<int>(), It.IsAny<TimeSpan?>(), It.IsAny<QueueRequestOptions>(), It.IsAny<OperationContext>(), It.IsAny<CancellationToken>()), Times.AtLeast(2));
mockQueue.Verify(q => q.DeleteMessageAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<QueueRequestOptions>(), It.IsAny<OperationContext>(), It.IsAny<CancellationToken>()), Times.Exactly(1));
mockPoisonQueue.Verify(q => q.AddMessageAsync(It.IsAny<CloudQueueMessage>(), It.IsAny<TimeSpan?>(), It.IsAny<TimeSpan?>(), It.IsAny<QueueRequestOptions>(), It.IsAny<OperationContext>(), It.IsAny<CancellationToken>()), Times.Exactly(1));
}

[Fact]
public void Exceptions_in_OnQueueEmpty_are_ignored()
{
Expand All @@ -316,7 +409,7 @@ public void Exceptions_in_OnQueueEmpty_are_ignored()

mockQueue.Setup(q => q.GetMessagesAsync(It.IsAny<int>(), It.IsAny<TimeSpan?>(), It.IsAny<QueueRequestOptions>(), It.IsAny<OperationContext>(), It.IsAny<CancellationToken>())).ReturnsAsync(Enumerable.Empty<CloudQueueMessage>());

var messagePump = new AsyncMessagePump("myqueue", mockStorageAccount.Object, 1, TimeSpan.FromMinutes(1), 3);
var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 3);
messagePump.OnMessage = (message, cancellationToken) =>
{
Interlocked.Increment(ref onMessageInvokeCount);
Expand Down Expand Up @@ -387,14 +480,23 @@ private static Mock<CloudQueue> GetMockQueue(string queueName)
}

private static Mock<CloudQueueClient> GetMockQueueClient(Mock<CloudQueue> mockQueue)
{
return GetMockQueueClient(new[] { mockQueue });
}

private static Mock<CloudQueueClient> GetMockQueueClient(IEnumerable<Mock<CloudQueue>> mockQueues)
{
var mockQueueStorageUri = new Uri(QUEUE_STORAGE_URL);
var storageCredentials = GetStorageCredentials();
var mockQueueClient = new Mock<CloudQueueClient>(MockBehavior.Strict, mockQueueStorageUri, storageCredentials);
mockQueueClient
.Setup(c => c.GetQueueReference(mockQueue.Object.Name))
.Returns(mockQueue.Object)
.Verifiable();
foreach (var mockQueue in mockQueues.ToArray())
{
mockQueueClient
.Setup(c => c.GetQueueReference(mockQueue.Object.Name))
.Returns(mockQueue.Object)
.Verifiable();

}
return mockQueueClient;
}

Expand Down Expand Up @@ -428,6 +530,5 @@ private static StorageCredentials GetStorageCredentials()
var storageCredentials = new StorageCredentials("account_name", accountAccessKey);
return storageCredentials;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0" />
<PackageReference Include="Moq" Version="4.8.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.0" />
<PackageReference Include="Moq" Version="4.8.2" />
<PackageReference Include="Shouldly" Version="3.0.0" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
Expand Down
12 changes: 10 additions & 2 deletions Source/Picton.Messaging/AsyncMessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class AsyncMessagePump
private static readonly ILog _logger = LogProvider.GetLogger(typeof(AsyncMessagePump));

private readonly IQueueManager _queueManager;
private readonly IQueueManager _poisonQueueManager;
private readonly int _concurrentTasks;
private readonly TimeSpan? _visibilityTimeout;
private readonly int _maxDequeueCount;
Expand Down Expand Up @@ -77,10 +78,11 @@ public class AsyncMessagePump
/// <param name="queueName">Name of the queue.</param>
/// <param name="storageAccount">The cloud storage account.</param>
/// <param name="concurrentTasks">The number of concurrent tasks.</param>
/// <param name="poisonQueueName">Name of the queue where messages are automatically moved to when they fail to be processed after 'maxDequeueCount' attempts. You can indicate that you do not want messages to be automatically moved by leaving this value empty. In such a scenario, you are responsible for handling so called 'poinson' messages.</param>
/// <param name="visibilityTimeout">The visibility timeout.</param>
/// <param name="maxDequeueCount">The maximum dequeue count.</param>
/// <param name="metrics">The system where metrics are published</param>
public AsyncMessagePump(string queueName, CloudStorageAccount storageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetrics metrics = null)
public AsyncMessagePump(string queueName, CloudStorageAccount storageAccount, int concurrentTasks = 25, string poisonQueueName = null, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetrics metrics = null)
{
if (concurrentTasks < 1) throw new ArgumentException("Number of concurrent tasks must be greather than zero", nameof(concurrentTasks));
if (maxDequeueCount < 1) throw new ArgumentException("Number of retries must be greather than zero", nameof(maxDequeueCount));
Expand All @@ -90,6 +92,11 @@ public AsyncMessagePump(string queueName, CloudStorageAccount storageAccount, in
_visibilityTimeout = visibilityTimeout;
_maxDequeueCount = maxDequeueCount;

if (!string.IsNullOrEmpty(poisonQueueName))
{
_poisonQueueManager = new QueueManager(poisonQueueName, storageAccount);
}

if (metrics == null)
{
var noop = new MetricsBuilder();
Expand Down Expand Up @@ -275,7 +282,8 @@ public void Stop()
OnError?.Invoke(message, ex, isPoison);
if (isPoison)
{
// PLEASE NOTE: we use "CancellationToken.None" to ensure a processed message is deleted from the queue even when the message pump is shutting down
// PLEASE NOTE: we use "CancellationToken.None" to ensure a processed message is deleted from the queue and moved to poison queue even when the message pump is shutting down
if (_poisonQueueManager != null) await _poisonQueueManager.AddMessageAsync(message.Content, null, null, null, null, CancellationToken.None).ConfigureAwait(false);
await _queueManager.DeleteMessageAsync(message, null, null, CancellationToken.None).ConfigureAwait(false);
}
}
Expand Down
5 changes: 3 additions & 2 deletions Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@ public Action<CancellationToken> OnQueueEmpty
/// <param name="queueName">Name of the queue.</param>
/// <param name="storageAccount">The cloud storage account.</param>
/// <param name="concurrentTasks">The number of concurrent tasks.</param>
/// <param name="poisonQueueName">Name of the queue where messages are automatically moved to when they fail to be processed after 'maxDequeueCount' attempts. You can indicate that you do not want messages to be automatically moved by leaving this value empty. In such a scenario, you are responsible for handling so called 'poinson' messages.</param>
/// <param name="visibilityTimeout">The visibility timeout.</param>
/// <param name="maxDequeueCount">The maximum dequeue count.</param>
/// <param name="metrics">The system where metrics are published</param>
public AsyncMessagePumpWithHandlers(string queueName, CloudStorageAccount storageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetrics metrics = null)
public AsyncMessagePumpWithHandlers(string queueName, CloudStorageAccount storageAccount, int concurrentTasks = 25, string poisonQueueName = null, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetrics metrics = null)
{
_messagePump = new AsyncMessagePump(queueName, storageAccount, concurrentTasks, visibilityTimeout, maxDequeueCount, metrics)
_messagePump = new AsyncMessagePump(queueName, storageAccount, concurrentTasks, poisonQueueName, visibilityTimeout, maxDequeueCount, metrics)
{
OnMessage = (message, cancellationToken) =>
{
Expand Down
11 changes: 3 additions & 8 deletions Source/Picton.Messaging/Picton.Messaging.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="App.Metrics" Version="2.0.0-preview1" />
<PackageReference Include="Microsoft.Extensions.DependencyModel" Version="1.1.2" />
<PackageReference Include="App.Metrics" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyModel" Version="2.0.4" />
<PackageReference Include="Picton" Version="3.0.0" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004">
<PrivateAssets>All</PrivateAssets>
Expand All @@ -41,11 +41,6 @@
<Reference Include="Microsoft.CSharp" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.6' ">
<PackageReference Include="System.Reflection.TypeExtensions" Version="4.3.0" />
<PackageReference Include="System.Runtime.Loader" Version="4.3.0" />
</ItemGroup>

<PropertyGroup Condition=" '$(TargetFramework)' == 'net452' ">
<DefineConstants>$(DefineConstants);NETFULL;LIBLOG_PORTABLE</DefineConstants>
</PropertyGroup>
Expand All @@ -59,7 +54,7 @@
</ItemGroup>

<PropertyGroup>
<CodeAnalysisRuleSet>$(SolutionDir)\Picton.Messaging.ruleset</CodeAnalysisRuleSet>
<CodeAnalysisRuleSet>$(SolutionDir)\StyleCopRules.ruleset</CodeAnalysisRuleSet>
</PropertyGroup>

</Project>
Loading

0 comments on commit 3f34bb2

Please sign in to comment.