Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ protected override async Task ProcessChannelAsync()
{
await work.Consumer.HandleBasicDeliverAsync(
work.ConsumerTag!, work.DeliveryTag, work.Redelivered,
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory)
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory, work.CancellationToken)
.ConfigureAwait(false);
}
break;
case WorkType.Cancel:
await work.Consumer.HandleBasicCancelAsync(work.ConsumerTag!)
await work.Consumer.HandleBasicCancelAsync(work.ConsumerTag!, work.CancellationToken)
.ConfigureAwait(false);
break;
case WorkType.CancelOk:
await work.Consumer.HandleBasicCancelOkAsync(work.ConsumerTag!)
await work.Consumer.HandleBasicCancelOkAsync(work.ConsumerTag!, work.CancellationToken)
.ConfigureAwait(false);
break;
case WorkType.ConsumeOk:
await work.Consumer.HandleBasicConsumeOkAsync(work.ConsumerTag!)
await work.Consumer.HandleBasicConsumeOkAsync(work.ConsumerTag!, work.CancellationToken)
.ConfigureAwait(false);
break;
case WorkType.Shutdown:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
private readonly ushort _concurrency;
private long _isQuiescing;
private bool _disposed;
private readonly CancellationTokenSource _shutdownCts = new CancellationTokenSource();

internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency)
{
Expand Down Expand Up @@ -92,7 +93,7 @@ public async ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, s
try
{
AddConsumer(consumer, consumerTag);
WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag);
WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag, _shutdownCts);
await _writer.WriteAsync(work, cancellationToken)
.ConfigureAwait(false);
}
Expand All @@ -113,7 +114,7 @@ public async ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliver
if (false == _disposed && false == IsQuiescing)
{
IAsyncBasicConsumer consumer = GetConsumerOrDefault(consumerTag);
var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body);
var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, _shutdownCts);
await _writer.WriteAsync(work, cancellationToken)
.ConfigureAwait(false);
}
Expand All @@ -126,7 +127,7 @@ public async ValueTask HandleBasicCancelOkAsync(string consumerTag, Cancellation
if (false == _disposed && false == IsQuiescing)
{
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag);
WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag, _shutdownCts);
await _writer.WriteAsync(work, cancellationToken)
.ConfigureAwait(false);
}
Expand All @@ -139,15 +140,28 @@ public async ValueTask HandleBasicCancelAsync(string consumerTag, CancellationTo
if (false == _disposed && false == IsQuiescing)
{
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag);
WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag, _shutdownCts);
await _writer.WriteAsync(work, cancellationToken)
.ConfigureAwait(false);
}
}

public void Quiesce()
{
if (IsQuiescing)
{
return;
}

Interlocked.Exchange(ref _isQuiescing, 1);
try
{
_shutdownCts.Cancel();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go with this approach, we might want to make Quiesce async and implement IAsyncDisposable instead on the consumer dispatcher channel base instead of IDisposable.

}
catch
{
// ignore
}
}

public async Task WaitForShutdownAsync()
Expand Down Expand Up @@ -203,18 +217,13 @@ protected bool IsQuiescing
{
get
{
if (Interlocked.Read(ref _isQuiescing) == 1)
{
return true;
}

return false;
return Interlocked.Read(ref _isQuiescing) == 1;
}
}

protected sealed override void ShutdownConsumer(IAsyncBasicConsumer consumer, ShutdownEventArgs reason)
{
_writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason));
_writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason, _shutdownCts));
}

protected override Task InternalShutdownAsync()
Expand All @@ -237,25 +246,32 @@ protected override Task InternalShutdownAsync()
public readonly RentedMemory Body;
public readonly ShutdownEventArgs? Reason;
public readonly WorkType WorkType;
public readonly CancellationToken CancellationToken;
private readonly CancellationTokenSource? _cancellationTokenSource;

private WorkStruct(WorkType type, IAsyncBasicConsumer consumer, string consumerTag)
private WorkStruct(WorkType type, IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
: this()
{
WorkType = type;
Consumer = consumer;
ConsumerTag = consumerTag;
CancellationToken = cancellationToken;
_cancellationTokenSource = null;
}

private WorkStruct(IAsyncBasicConsumer consumer, ShutdownEventArgs reason)
private WorkStruct(IAsyncBasicConsumer consumer, ShutdownEventArgs reason, CancellationTokenSource? cancellationTokenSource)
: this()
{
WorkType = WorkType.Shutdown;
Consumer = consumer;
Reason = reason;
CancellationToken = cancellationTokenSource?.Token ?? CancellationToken.None;
this._cancellationTokenSource = cancellationTokenSource;
}

private WorkStruct(IAsyncBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body)
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body,
CancellationToken cancellationToken)
{
WorkType = WorkType.Deliver;
Consumer = consumer;
Expand All @@ -266,37 +282,62 @@ private WorkStruct(IAsyncBasicConsumer consumer, string consumerTag, ulong deliv
RoutingKey = routingKey;
BasicProperties = basicProperties;
Body = body;
Reason = default;
Reason = null;
CancellationToken = cancellationToken;
_cancellationTokenSource = null;
}

public static WorkStruct CreateCancel(IAsyncBasicConsumer consumer, string consumerTag)
public static WorkStruct CreateCancel(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource)
{
return new WorkStruct(WorkType.Cancel, consumer, consumerTag);
return new WorkStruct(WorkType.Cancel, consumer, consumerTag, cancellationTokenSource.Token);
}

public static WorkStruct CreateCancelOk(IAsyncBasicConsumer consumer, string consumerTag)
public static WorkStruct CreateCancelOk(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource)
{
return new WorkStruct(WorkType.CancelOk, consumer, consumerTag);
return new WorkStruct(WorkType.CancelOk, consumer, consumerTag, cancellationTokenSource.Token);
}

public static WorkStruct CreateConsumeOk(IAsyncBasicConsumer consumer, string consumerTag)
public static WorkStruct CreateConsumeOk(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource)
{
return new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag);
return new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag, cancellationTokenSource.Token);
}

public static WorkStruct CreateShutdown(IAsyncBasicConsumer consumer, ShutdownEventArgs reason)
public static WorkStruct CreateShutdown(IAsyncBasicConsumer consumer, ShutdownEventArgs reason, CancellationTokenSource cancellationTokenSource)
{
return new WorkStruct(consumer, reason);
// Create a linked CTS so the shutdown args token reflects both dispatcher cancellation and any upstream token.
CancellationTokenSource? linked = null;
try
{
if (reason.CancellationToken.CanBeCanceled)
{
linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokenSource.Token, reason.CancellationToken);
}
}
catch
{
linked = null;
}

CancellationToken token = linked?.Token ?? cancellationTokenSource.Token;
ShutdownEventArgs argsWithToken = reason.Exception != null ?
new ShutdownEventArgs(reason.Initiator, reason.ReplyCode, reason.ReplyText, reason.Exception, token) :
new ShutdownEventArgs(reason.Initiator, reason.ReplyCode, reason.ReplyText, reason.ClassId, reason.MethodId, reason.Cause, token);

return new WorkStruct(consumer, argsWithToken, linked);
}

public static WorkStruct CreateDeliver(IAsyncBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body)
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body, CancellationTokenSource cancellationTokenSource)
{
return new WorkStruct(consumer, consumerTag, deliveryTag, redelivered,
exchange, routingKey, basicProperties, body);
exchange, routingKey, basicProperties, body, cancellationTokenSource.Token);
}

public void Dispose() => Body.Dispose();
public void Dispose()
{
Body.Dispose();
_cancellationTokenSource?.Dispose();
}
}

protected enum WorkType : byte
Expand All @@ -317,6 +358,7 @@ protected virtual void Dispose(bool disposing)
if (disposing)
{
Quiesce();
_shutdownCts.Dispose();
}
}
catch
Expand Down
6 changes: 3 additions & 3 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ protected void TakeOver(Channel other)
public Task CloseAsync(ushort replyCode, string replyText, bool abort,
CancellationToken cancellationToken)
{
var args = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText);
var args = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText, cancellationToken: cancellationToken);
return CloseAsync(args, abort, cancellationToken);
}

Expand Down Expand Up @@ -850,7 +850,7 @@ await Session.Connection.HandleConnectionBlockedAsync(reason, cancellationToken)
protected async Task<bool> HandleConnectionCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
var method = new ConnectionClose(cmd.MethodSpan);
var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId);
var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId, cancellationToken: cancellationToken);
try
{
/*
Expand Down Expand Up @@ -895,7 +895,7 @@ protected async Task<bool> HandleConnectionStartAsync(IncomingCommand cmd, Cance
{
if (m_connectionStartCell is null)
{
var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start");
var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start", cancellationToken: cancellationToken);
await Session.Connection.CloseAsync(reason, false,
InternalConstants.DefaultConnectionCloseTimeout,
cancellationToken)
Expand Down
15 changes: 10 additions & 5 deletions projects/RabbitMQ.Client/Impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ await ReceiveLoopAsync(mainLoopToken)
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
Constants.InternalError,
"Thread aborted (AppDomain unloaded?)",
exception: taex);
exception: taex,
cancellationToken: mainLoopToken);
await HandleMainLoopExceptionAsync(ea)
.ConfigureAwait(false);
}
Expand All @@ -73,7 +74,8 @@ await HandleMainLoopExceptionAsync(ea)
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
0,
"End of stream",
exception: eose);
exception: eose,
cancellationToken: mainLoopToken);
await HandleMainLoopExceptionAsync(ea)
.ConfigureAwait(false);
}
Expand All @@ -91,7 +93,8 @@ await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken)
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
Constants.InternalError,
fileLoadException.Message,
exception: fileLoadException);
exception: fileLoadException,
cancellationToken: mainLoopToken);
await HandleMainLoopExceptionAsync(ea)
.ConfigureAwait(false);
}
Expand All @@ -106,7 +109,8 @@ await HandleMainLoopExceptionAsync(ea)
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
Constants.InternalError,
ocex.Message,
exception: ocex);
exception: ocex,
cancellationToken: mainLoopToken);
await HandleMainLoopExceptionAsync(ea)
.ConfigureAwait(false);
}
Expand All @@ -116,7 +120,8 @@ await HandleMainLoopExceptionAsync(ea)
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
Constants.InternalError,
ex.Message,
exception: ex);
exception: ex,
cancellationToken: mainLoopToken);
await HandleMainLoopExceptionAsync(ea)
.ConfigureAwait(false);
}
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ await _channel0.ConnectionOpenAsync(_config.VirtualHost, cancellationToken)
{
try
{
var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen");
var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen", cancellationToken: cancellationToken);
await CloseAsync(ea, true,
InternalConstants.DefaultConnectionAbortTimeout,
cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -299,7 +299,7 @@ internal void EnsureIsOpen()
public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort,
CancellationToken cancellationToken = default)
{
var reason = new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText);
var reason = new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText, cancellationToken: cancellationToken);
return CloseAsync(reason, abort, timeout, cancellationToken);
}

Expand Down
Loading
Loading