Skip to content

Commit

Permalink
Merge pull request #2 from dombrovsky/f/basic-extensions
Browse files Browse the repository at this point in the history
Basic extensions
  • Loading branch information
dombrovsky authored May 18, 2024
2 parents 44d482e + 839290f commit 7a9160c
Show file tree
Hide file tree
Showing 38 changed files with 1,288 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public bool Dispose(TimeSpan timeout)
return _rootTaskFlow.Dispose(timeout);
}

public Task<T> Enqueue<T>(Func<CancellationToken, ValueTask<T>> taskFunc, CancellationToken cancellationToken)
public Task<T> Enqueue<T>(Func<object?, CancellationToken, ValueTask<T>> taskFunc, object? state, CancellationToken cancellationToken)
{
return _taskScheduler.Enqueue(taskFunc, cancellationToken);
return _taskScheduler.Enqueue(taskFunc, state, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public bool Dispose(TimeSpan timeout)
return _taskFlow.Dispose(timeout);
}

public Task<T> Enqueue<T>(Func<CancellationToken, ValueTask<T>> taskFunc, CancellationToken cancellationToken)
public Task<T> Enqueue<T>(Func<object?, CancellationToken, ValueTask<T>> taskFunc, object? state, CancellationToken cancellationToken)
{
return _taskFlow.Enqueue(taskFunc, cancellationToken);
return _taskFlow.Enqueue(taskFunc, state, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
namespace TaskFlow.Tests.Extensions
{
using NUnit.Framework;
using System.Threading.Tasks.Flow;

[TestFixture]
public sealed class CancelPreviousTaskSchedulerExtensionsFixture
{
private ITaskFlow? _taskFlow;

[TearDown]
public void TearDown()
{
_taskFlow?.Dispose(TimeSpan.FromSeconds(1));
}

[TestCaseSource(typeof(TaskFlows), nameof(TaskFlows.CreateTaskFlows))]
public async Task Enqueue_ShouldCancelPreviousOperation(ITaskFlow taskFlow)
{
_taskFlow = taskFlow;

var cancelPrevious = taskFlow.CreateCancelPrevious();

var task1 = cancelPrevious.Enqueue(token => Task.Delay(1000, token));
Assert.That(task1.IsCompleted, Is.False);

var task2 = cancelPrevious.Enqueue(token => Task.Delay(1000, token));
Assert.That(() => task1.IsCanceled, Is.True.After(100, 10), task1.Status.ToString);
Assert.That(task2.IsCompleted, Is.False);

var task3 = cancelPrevious.Enqueue(token => Task.Delay(1000, token));
Assert.That(() => task2.IsCanceled, Is.True.After(100, 10), task2.Status.ToString);
Assert.That(task3.IsCompleted, Is.False);


await task3.ConfigureAwait(false);
Assert.That(task3.IsCompletedSuccessfully, Is.True);
}

[TestCaseSource(typeof(TaskFlows), nameof(TaskFlows.CreateTaskFlows))]
public async Task Enqueue_ShouldNotCancelPreviousOperations_OnParentTaskScheduler(ITaskFlow taskFlow)
{
_taskFlow = taskFlow;

var parentTaskScheduler = (ITaskScheduler)taskFlow;
var cancelPrevious = taskFlow.CreateCancelPrevious();

var task1 = parentTaskScheduler.Enqueue(token => Task.Delay(1000, token));
Assert.That(task1.IsCompleted, Is.False);

var task2 = cancelPrevious.Enqueue(token => Task.Delay(1000, token));
Assert.That(() => task1.IsCanceled, Is.False.After(100, 10), task1.Status.ToString);
Assert.That(task2.IsCompleted, Is.False);

await task1.ConfigureAwait(false);

var task3 = cancelPrevious.Enqueue(token => Task.Delay(1000, token));
Assert.That(() => task2.IsCanceled, Is.True.After(100, 10), task2.Status.ToString);
Assert.That(task3.IsCompleted, Is.False);

await task3.ConfigureAwait(false);
Assert.That(task1.IsCompletedSuccessfully, Is.True);
Assert.That(task2.IsCanceled, Is.True);
Assert.That(task3.IsCompletedSuccessfully, Is.True);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
namespace TaskFlow.Tests.Extensions
{
using NUnit.Framework;
using System.Threading.Tasks.Flow;

[TestFixture]
public sealed class CancellationScopeTaskSchedulerExtensionsFixture
{
private ITaskFlow? _taskFlow;

[TearDown]
public void TearDown()
{
_taskFlow?.Dispose(TimeSpan.FromSeconds(1));
}

[TestCaseSource(typeof(TaskFlows), nameof(TaskFlows.CreateTaskFlows))]
public async Task SingleScope_Cancel_ShouldCancelOperationsThatBelongToScope(ITaskFlow taskFlow)
{
_taskFlow = taskFlow;

using var cts = new CancellationTokenSource();
var cancellationScope = taskFlow.CreateCancellationScope(cts.Token);

var task1 = cancellationScope.Enqueue(token => Task.Delay(1000, token));
var task2 = _taskFlow.Enqueue(token => Task.Delay(1000, token));
var task3 = cancellationScope.Enqueue(token => Task.Delay(1000, token));

cts.Cancel();

Assert.That(() => task1.IsCanceled, Is.True.After(100, 10), task1.Status.ToString);
Assert.That(() => task2.IsCanceled, Is.False.After(100, 10), task2.Status.ToString);

await task2.ConfigureAwait(false);
Assert.That(task2.IsCompletedSuccessfully, Is.True);

Assert.That(() => task3.IsCanceled, Is.True.After(100, 10), task1.Status.ToString);
}

[TestCaseSource(typeof(TaskFlows), nameof(TaskFlows.CreateTaskFlows))]
public async Task MultipleScopes_Cancel_ShouldCancelOperationsThatBelongToScope(ITaskFlow taskFlow)
{
_taskFlow = taskFlow;

using var scope1Cts = new CancellationTokenSource();
using var scope2Cts = new CancellationTokenSource();
var cancellationScope1 = taskFlow.CreateCancellationScope(scope1Cts.Token);
var cancellationScope2 = taskFlow.CreateCancellationScope(scope2Cts.Token);

var task1 = cancellationScope1.Enqueue(token => Task.Delay(1000, token));
var task2 = cancellationScope2.Enqueue(token => Task.Delay(1000, token));
var task3 = cancellationScope1.Enqueue(token => Task.Delay(1000, token));

scope1Cts.Cancel();

Assert.That(() => task1.IsCanceled, Is.True.After(100, 10), task1.Status.ToString);
Assert.That(() => task2.IsCanceled, Is.False.After(100, 10), task2.Status.ToString);

await task2.ConfigureAwait(false);
Assert.That(task2.IsCompletedSuccessfully, Is.True);

Assert.That(() => task3.IsCanceled, Is.True.After(100, 10), task1.Status.ToString);
}
}
}
106 changes: 106 additions & 0 deletions TaskFlow.Tests/Extensions/ExceptionTaskSchedulerExtensionsFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
namespace TaskFlow.Tests.Extensions
{
using NUnit.Framework;
using System.Threading.Tasks.Flow;

[TestFixture]
public sealed class ExceptionTaskSchedulerExtensionsFixture
{
private ITaskFlow? _taskFlow;

[TearDown]
public void TearDown()
{
_taskFlow?.Dispose(TimeSpan.FromSeconds(1));
}

[TestCaseSource(typeof(TaskFlows), nameof(TaskFlows.CreateTaskFlows))]
public void Enqueue_ShouldExecuteHandler_IfExceptionOfThatTypeOccurred(ITaskFlow taskFlow)
{
_taskFlow = taskFlow;

var exceptions = new List<Exception>();
var task = taskFlow
.OnError<InvalidOperationException>(exceptions.Add)
.Enqueue(() => throw new InvalidOperationException());

Assert.That(async () => await task.ConfigureAwait(false), Throws.InstanceOf<InvalidOperationException>());
Assert.That(exceptions, Has.One.TypeOf<InvalidOperationException>());
}

[TestCaseSource(typeof(TaskFlows), nameof(TaskFlows.CreateTaskFlows))]
public void Enqueue_ShouldNotExecuteHandler_IfExceptionOfAnotherTypeOccurred(ITaskFlow taskFlow)
{
_taskFlow = taskFlow;

var exceptions = new List<Exception>();
var task = taskFlow
.OnError<InvalidOperationException>(exceptions.Add)
.Enqueue(() => throw new NullReferenceException());

Assert.That(async () => await task.ConfigureAwait(false), Throws.InstanceOf<NullReferenceException>());
Assert.That(exceptions, Is.Empty);
}

[TestCaseSource(typeof(TaskFlows), nameof(TaskFlows.CreateTaskFlows))]
public void Enqueue_ShouldExecuteHandler_IfExceptionMatchingFilter(ITaskFlow taskFlow)
{
_taskFlow = taskFlow;

var exceptions = new List<Exception>();
var task = taskFlow
.OnError<InvalidOperationException>(exceptions.Add, exception => exception.Message.Contains("foo"))
.Enqueue(() => throw new InvalidOperationException("foo"));

Assert.That(async () => await task.ConfigureAwait(false), Throws.InstanceOf<InvalidOperationException>());
Assert.That(exceptions, Has.One.TypeOf<InvalidOperationException>());
}

[TestCaseSource(typeof(TaskFlows), nameof(TaskFlows.CreateTaskFlows))]
public void Enqueue_ShouldNotExecuteHandler_IfExceptionNotMatchingFilter(ITaskFlow taskFlow)
{
_taskFlow = taskFlow;

var exceptions = new List<Exception>();
var task = taskFlow
.OnError<InvalidOperationException>(exceptions.Add, exception => exception.Message.Contains("bar"))
.Enqueue(() => throw new InvalidOperationException("foo"));

Assert.That(async () => await task.ConfigureAwait(false), Throws.InstanceOf<InvalidOperationException>());
Assert.That(exceptions, Is.Empty);
}

[TestCaseSource(typeof(TaskFlows), nameof(TaskFlows.CreateTaskFlows))]
public void Enqueue_MultipleHandlers_ShouldExecuteAllMatchingHandlers(ITaskFlow taskFlow)
{
_taskFlow = taskFlow;

var exceptions = new List<string>();
var errorHandlingScheduler = taskFlow
.OnError<InvalidOperationException>(_ => exceptions.Add("foo"), exception => exception.Message.Contains("foo"))
.OnError<InvalidOperationException>(_ => exceptions.Add("bar"), exception => exception.Message.Contains("bar"))
.OnError(_ => exceptions.Add("generic"));

var task1 = errorHandlingScheduler.Enqueue(() => throw new InvalidOperationException("foo"));

Assert.That(async () => await task1.ConfigureAwait(false), Throws.InstanceOf<InvalidOperationException>());
Assert.That(exceptions, Is.EqualTo(new[] { "foo", "generic" }));
exceptions.Clear();

var task2 = errorHandlingScheduler.Enqueue(() => throw new InvalidOperationException("bar"));
Assert.That(async () => await task2.ConfigureAwait(false), Throws.InstanceOf<InvalidOperationException>());
Assert.That(exceptions, Is.EqualTo(new[] { "bar", "generic" }));
exceptions.Clear();

var task3 = errorHandlingScheduler.Enqueue(() => throw new InvalidOperationException());
Assert.That(async () => await task3.ConfigureAwait(false), Throws.InstanceOf<InvalidOperationException>());
Assert.That(exceptions, Is.EqualTo(new[] { "generic" }));
exceptions.Clear();

var task4 = errorHandlingScheduler.Enqueue(() => throw new NullReferenceException());
Assert.That(async () => await task4.ConfigureAwait(false), Throws.InstanceOf<NullReferenceException>());
Assert.That(exceptions, Is.EqualTo(new[] { "generic" }));
exceptions.Clear();
}
}
}
13 changes: 13 additions & 0 deletions TaskFlow.Tests/Extensions/TaskFlows.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace TaskFlow.Tests.Extensions
{
using System.Threading.Tasks.Flow;

internal static class TaskFlows
{
public static IEnumerable<ITaskFlow> CreateTaskFlows()
{
yield return new TaskFlow();
yield return new DedicatedThreadTaskFlow();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
namespace TaskFlow.Tests.Extensions
{
using Microsoft.Extensions.Time.Testing;
using NUnit.Framework;
using System.Threading.Tasks.Flow;

[TestFixture]
public sealed class ThrottlingTaskSchedulerExtensionsFixture
{
private ITaskFlow? _taskFlow;
private FakeTimeProvider _timeProvider;

[SetUp]
public void Setup()
{
_timeProvider = new FakeTimeProvider();
}

[TearDown]
public void TearDown()
{
_taskFlow?.Dispose(TimeSpan.FromSeconds(1));
}

[TestCaseSource(typeof(TaskFlows), nameof(TaskFlows.CreateTaskFlows))]
public async Task Enqueue_ShouldExecuteOnlyIfDebounceIntervalPassed(ITaskFlow taskFlow)
{
_taskFlow = taskFlow;

var debounceTaskScheduler = taskFlow.WithDebounce(TimeSpan.FromSeconds(5), _timeProvider);

var counter = 0;
for (var i = 0; i < 10; i++)
{
_ = debounceTaskScheduler.Enqueue(() => Interlocked.Increment(ref counter));
_timeProvider.Advance(TimeSpan.FromSeconds(1));
}

await _taskFlow.Enqueue(() => { });

Assert.That(counter, Is.EqualTo(2));
}

[TestCaseSource(typeof(TaskFlows), nameof(TaskFlows.CreateTaskFlows))]
public async Task Enqueue_ShouldThrowIfDebounceIntervalNotPassed(ITaskFlow taskFlow)
{
_taskFlow = taskFlow;

var debounceTaskScheduler = taskFlow.WithDebounce(TimeSpan.FromSeconds(5), _timeProvider);

var task1 = debounceTaskScheduler.Enqueue(() => { });
var task2 = debounceTaskScheduler.Enqueue(() => { });

await _taskFlow.Enqueue(() => { });

Assert.That(task1.IsCompletedSuccessfully, Is.True);
Assert.That(async () => await task2, Throws.TypeOf<OperationThrottledException>());
}
}
}
Loading

0 comments on commit 7a9160c

Please sign in to comment.