Skip to content

Commit

Permalink
PersistentStreamPullingAgent: suppress execution context in message pump
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond committed Feb 7, 2025
1 parent 1f40041 commit fea4580
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 26 deletions.
24 changes: 15 additions & 9 deletions src/Orleans.Core/Async/AsyncExecutorWithRetries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public static Task<T> ExecuteWithRetries<T>(
int maxNumErrorTries,
Func<Exception, int, bool> retryExceptionFilter,
TimeSpan maxExecutionTime,
IBackoffProvider onErrorBackOff)
IBackoffProvider onErrorBackOff,
CancellationToken cancellationToken = default)
{
return ExecuteWithRetries<T>(
function,
Expand All @@ -94,7 +95,8 @@ public static Task<T> ExecuteWithRetries<T>(
retryExceptionFilter,
maxExecutionTime,
null,
onErrorBackOff);
onErrorBackOff,
cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -140,7 +142,8 @@ public static Task<T> ExecuteWithRetries<T>(
Func<Exception, int, bool> retryExceptionFilter,
TimeSpan maxExecutionTime = default,
IBackoffProvider onSuccessBackOff = null,
IBackoffProvider onErrorBackOff = null)
IBackoffProvider onErrorBackOff = null,
CancellationToken cancellationToken = default)
{
return ExecuteWithRetriesHelper<T>(
function,
Expand All @@ -151,7 +154,8 @@ public static Task<T> ExecuteWithRetries<T>(
retryValueFilter,
retryExceptionFilter,
onSuccessBackOff,
onErrorBackOff);
onErrorBackOff,
cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -201,7 +205,8 @@ private static async Task<T> ExecuteWithRetriesHelper<T>(
Func<T, int, bool> retryValueFilter = null,
Func<Exception, int, bool> retryExceptionFilter = null,
IBackoffProvider onSuccessBackOff = null,
IBackoffProvider onErrorBackOff = null)
IBackoffProvider onErrorBackOff = null,
CancellationToken cancellationToken = default)
{
T result = default;
ExceptionDispatchInfo lastExceptionInfo = null;
Expand All @@ -211,6 +216,7 @@ private static async Task<T> ExecuteWithRetriesHelper<T>(
do
{
retry = false;
cancellationToken.ThrowIfCancellationRequested();

if (maxExecutionTime != Timeout.InfiniteTimeSpan && maxExecutionTime != default)
{
Expand Down Expand Up @@ -241,13 +247,13 @@ private static async Task<T> ExecuteWithRetriesHelper<T>(
retry = retryValueFilter(result, counter);
}

if (retry)
if (retry && !cancellationToken.IsCancellationRequested)
{
TimeSpan? delay = onSuccessBackOff?.Next(counter);

if (delay.HasValue)
{
await Task.Delay(delay.Value);
await Task.Delay(delay.Value, cancellationToken);
}
}
}
Expand All @@ -261,7 +267,7 @@ private static async Task<T> ExecuteWithRetriesHelper<T>(
retry = retryExceptionFilter(exc, counter);
}

if (!retry)
if (!retry || cancellationToken.IsCancellationRequested)
{
throw;
}
Expand All @@ -272,7 +278,7 @@ private static async Task<T> ExecuteWithRetriesHelper<T>(

if (delay.HasValue)
{
await Task.Delay(delay.Value);
await Task.Delay(delay.Value, cancellationToken);
}
}
} while (retry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Orleans.Configuration;
using Orleans.Internal;
using Orleans.Runtime;
using Orleans.Runtime.Internal;
using Orleans.Streams.Filtering;

namespace Orleans.Streams
Expand Down Expand Up @@ -109,6 +110,7 @@ public Task Initialize()
{
if (queueAdapterCache != null)
{
using var _ = new ExecutionContextSuppressor();
queueCache = queueAdapterCache.CreateQueueCache(QueueId);
}
}
Expand All @@ -120,6 +122,7 @@ public Task Initialize()

try
{
using var _ = new ExecutionContextSuppressor();
receiver = queueAdapter.CreateReceiver(QueueId);
}
catch (Exception exc)
Expand All @@ -130,6 +133,7 @@ public Task Initialize()

try
{
using var _ = new ExecutionContextSuppressor();
receiverInitTask = OrleansTaskExtentions.SafeExecute(() => receiver.Initialize(this.options.InitQueueTimeout))
.LogException(logger, ErrorCode.PersistentStreamPullingAgent_03, $"QueueAdapterReceiver {QueueId:H} failed to Initialize.");
receiverInitTask.Ignore();
Expand All @@ -145,7 +149,7 @@ public Task Initialize()
// Setup a reader for a new receiver.
// Even if the receiver failed to initialize, treat it as OK and start pumping it. It's receiver responsibility to retry initialization.
var randomTimerOffset = RandomTimeSpan.Next(this.options.GetQueueMsgsTimerPeriod);
timer = RegisterTimer(AsyncTimerCallback, QueueId, randomTimerOffset, this.options.GetQueueMsgsTimerPeriod);
timer = RegisterGrainTimer(AsyncTimerCallback, QueueId, randomTimerOffset, this.options.GetQueueMsgsTimerPeriod);

StreamInstruments.RegisterPersistentStreamPubSubCacheSizeObserve(() => new Measurement<int>(pubSubCache.Count, new KeyValuePair<string, object>("name", StatisticUniquePostfix)));

Expand Down Expand Up @@ -349,9 +353,14 @@ public void RemoveSubscriber_Impl(GuidId subscriptionId, QualifiedStreamId strea
pubSubCache.Remove(streamId);
}

private async Task AsyncTimerCallback(object state)
private Task AsyncTimerCallback(QueueId queueId, CancellationToken cancellationToken)
{
using var _ = new ExecutionContextSuppressor();
return PumpQueue(queueId, cancellationToken);
}

private async Task PumpQueue(QueueId queueId, CancellationToken cancellationToken)
{
var queueId = (QueueId)state;
try
{
Task localReceiverInitTask = receiverInitTask;
Expand All @@ -361,10 +370,10 @@ private async Task AsyncTimerCallback(object state)
receiverInitTask = null;
}

if (IsShutdown) return; // timer was already removed, last tick
if (IsShutdown || cancellationToken.IsCancellationRequested) return; // timer was already removed, last tick

// loop through the queue until it is empty.
while (!IsShutdown) // timer will be set to null when we are asked to shutdown.
while (!IsShutdown && !cancellationToken.IsCancellationRequested) // timer will be set to null when we are asked to shutdown.
{
int maxCacheAddCount = queueCache?.GetMaxAddCount() ?? QueueAdapterConstants.UNLIMITED_GET_QUEUE_MSG;
if (maxCacheAddCount != QueueAdapterConstants.UNLIMITED_GET_QUEUE_MSG && maxCacheAddCount <= 0)
Expand All @@ -379,7 +388,8 @@ private async Task AsyncTimerCallback(object state)
ReadLoopRetryMax,
ReadLoopRetryExceptionFilter,
Timeout.InfiniteTimeSpan,
queueReaderBackoffProvider);
queueReaderBackoffProvider,
cancellationToken: cancellationToken);
if (!moreData)
return;
}
Expand All @@ -394,18 +404,18 @@ private async Task AsyncTimerCallback(object state)
queueId,
ReadLoopRetryMax);
}
}

private bool ReadLoopRetryExceptionFilter(Exception e, int retryCounter)
{
this.logger.LogWarning(
(int)ErrorCode.PersistentStreamPullingAgent_12,
e,
"Exception while retrying the {RetryCounter}th time reading from queue {QueueId}",
retryCounter,
QueueId);

return !IsShutdown;
bool ReadLoopRetryExceptionFilter(Exception e, int retryCounter)
{
this.logger.LogWarning(
(int)ErrorCode.PersistentStreamPullingAgent_12,
e,
"Exception while retrying the {RetryCounter}th time reading from queue {QueueId}",
retryCounter,
QueueId);

return !cancellationToken.IsCancellationRequested && !IsShutdown;
}
}

/// <summary>
Expand Down

0 comments on commit fea4580

Please sign in to comment.