Skip to content

Commit 645d769

Browse files
committed
Test consumer cancellation
1 parent 4377398 commit 645d769

File tree

1 file changed

+72
-0
lines changed

1 file changed

+72
-0
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using RabbitMQ.Client;
4+
using RabbitMQ.Client.Events;
5+
using Xunit;
6+
using Xunit.Abstractions;
7+
8+
namespace Test.Integration
9+
{
10+
public class TestAsyncConsumerCancellation : IntegrationFixture
11+
{
12+
public TestAsyncConsumerCancellation(ITestOutputHelper output) : base(output)
13+
{
14+
}
15+
16+
[Fact]
17+
public async Task TestConsumerCancellation()
18+
{
19+
string exchangeName = GenerateExchangeName();
20+
string queueName = GenerateQueueName();
21+
string routingKey = string.Empty;
22+
23+
await _channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct);
24+
await _channel.QueueDeclareAsync(queueName, false, false, true, null);
25+
await _channel.QueueBindAsync(queueName, exchangeName, routingKey, null);
26+
27+
var tcsMessageReceived = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
28+
var tcsReceivedCancelled = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
29+
var tcsShutdownCancelled = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
30+
31+
var consumer = new AsyncEventingBasicConsumer(_channel);
32+
consumer.ShutdownAsync += async (model, ea) =>
33+
{
34+
try
35+
{
36+
await Task.Delay(TimeSpan.FromMinutes(1), ea.CancellationToken);
37+
}
38+
catch (OperationCanceledException)
39+
{
40+
tcsShutdownCancelled.SetResult(true);
41+
}
42+
};
43+
consumer.ReceivedAsync += async (model, ea) =>
44+
{
45+
tcsMessageReceived.SetResult(true);
46+
try
47+
{
48+
await Task.Delay(TimeSpan.FromMinutes(1), ea.CancellationToken);
49+
}
50+
catch (OperationCanceledException)
51+
{
52+
tcsReceivedCancelled.SetResult(true);
53+
}
54+
};
55+
await _channel.BasicConsumeAsync(queueName, false, consumer);
56+
57+
//publisher
58+
await using IChannel publisherChannel = await _conn.CreateChannelAsync(_createChannelOptions);
59+
byte[] messageBodyBytes = "Hello, world!"u8.ToArray();
60+
var props = new BasicProperties();
61+
await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty,
62+
mandatory: false, basicProperties: props, body: messageBodyBytes);
63+
64+
await WaitAsync(tcsMessageReceived, TimeSpan.FromSeconds(5), "Consumer received message");
65+
66+
await _channel.CloseAsync();
67+
68+
await WaitAsync(tcsMessageReceived, TimeSpan.FromSeconds(5), "Consumer closed");
69+
await WaitAsync(tcsShutdownCancelled, TimeSpan.FromSeconds(5), "Consumer closed");
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)