Skip to content

Commit

Permalink
fix: Consume notification when breaking in await foreach of Stream
Browse files Browse the repository at this point in the history
Note that currently there might still be notifications left after removing the listener.
Tangentially related to #68.
  • Loading branch information
bojidar-bg committed Dec 14, 2020
1 parent 66bbbba commit d8e0664
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
13 changes: 9 additions & 4 deletions functions/runtime/src/Perper.WebJobs.Extensions/Model/Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,16 @@ private async IAsyncEnumerable<T> Impl([EnumeratorCancellation] CancellationToke

await ((State)_stream._state).LoadStateEntries();

yield return (T)converter.Invoke(value)!;

await ((State)_stream._state).StoreStateEntries();
try
{
yield return (T)converter.Invoke(value)!;
}
finally
{
await ((State)_stream._state).StoreStateEntries();

await _stream._fabric.ConsumeNotification(key);
await _stream._fabric.ConsumeNotification(key);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public PerperTriggerValueBinder(JObject trigger, IIgniteClient ignite, PerperBin
return Task.FromResult<object?>(null);
}

public async Task SetValueAsync(object value, CancellationToken cancellationToken)
public async Task SetValueAsync(object? value, CancellationToken cancellationToken)
{
if (_trigger.ContainsKey("Call"))
{
Expand All @@ -50,6 +50,11 @@ public async Task SetValueAsync(object value, CancellationToken cancellationToke
{
var stream = (string)_trigger["Stream"]!;

if (value == null)
{
return;
}

var asyncEnumerableInterface = PerperTypeUtils.GetGenericInterface(value.GetType(), typeof(IAsyncEnumerable<>));
if (asyncEnumerableInterface == null)
{
Expand Down

0 comments on commit d8e0664

Please sign in to comment.