Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

There is no way to detect that a given stream has finished. #68

Open
IoanStoianov opened this issue Dec 11, 2020 · 1 comment
Open

There is no way to detect that a given stream has finished. #68

IoanStoianov opened this issue Dec 11, 2020 · 1 comment
Labels
enhancement New feature or request
Milestone

Comments

@IoanStoianov
Copy link
Collaborator

[FunctionName("Generator")]
public static async IAsyncEnumerable<dynamic> Generator([PerperTrigger] dynamic parameters, ILogger logger)
{
    for (var i = 0; i <= parameters.Count; i++)
    {
        yield return new { Num = i };
    }
}
var generator = await context.StreamFunctionAsync<dynamic>("Generator", new { Count = count });

await foreach(var element in generator){
    ...
}

The foreach will iterate through the numbers from 0 to 10 and it will freeze

@IoanStoianov IoanStoianov added the enhancement New feature or request label Dec 11, 2020
bojidar-bg added a commit that referenced this issue Dec 14, 2020
Note that currently there might still be notifications left after removing the listener.
Tangentially related to #68.
branimirangelov added a commit that referenced this issue Jan 2, 2021
* fix: BigInteger deserialization was throwing by-ref errors

* fix: Add a PerperNameMapper to fix #67

* fix: Consume notification when breaking in await foreach of Stream

Note that currently there might still be notifications left after removing the listener.
Tangentially related to #68.

* fix: Regression with root null values from NameMapper changes

* fix: Fix "pairing" of notifications in Fabric

Previously, Fabric sent notifications in pairs if there was sufficient delay between the notifications, due to Ignite processing the local queries for the cache before we could write to the queue.
Note that this commit introduces a potential race condition if Fabric reads the queued notification, sends that to the runtime, and the runtime tries to read the notification, all before we can write it to the cache.

* chore: prepare for merge

Co-authored-by: Bojidar Marinov <[email protected]>
@bojidar-bg
Copy link
Collaborator

bojidar-bg commented Nov 18, 2021

This should be possible since 884a80a as Streams now have an underlying Execution which is marked as finished whenever they finish, however care must be taken when implementing as we still want to read the last few items before stopping the enumeration.

A few possible implementations
  1. On the stream side, we store the final key in the Result field of the Execution. On the listener side, we wait for the execution to finish, and once it does, we continue enumerating until we reach that last key, or exit immediately if we already read it.
    This is the simplest convention to implement. However, we do not have a last key to store when the stream has no items, and finding the last key to store after a "crash -> stream restart -> stream exits before writing any more items" scenario will require either storing state or a query over all keys.

    1. On the listener side, we signal Fabric when the execution is over; after that signal, Fabric cancels all ContiniousQueries or stops enumerating instead of making ContiniousQuery.
      This should work; unfortunately, signalling a GRPC stream to "start finishing" is only possible via either restarting it (not cool, would require a re-query for non-packed streams) or with a duplex streaming call which isn't cool either.

    2. We couple Fabric to the Stream/Execution convention; instead of signalling the GRPC stream from the client, we monitor the execution from within the GRPC stream items handler.
      This goes against the current idea of keeping the Fabric core relatively lean and devoid of criss-crossing connections.

  2. On the stream side, we store an extra placeholder item at long.MaxValue. On the listener side, we stop iterating when we see that key without reading the item. In Fabric, we implement an additional check before launch a ContiniousQuery for packed streams.
    Ideally, conventions such as this one would stay out of Fabric, however considering that we already use long.MinValue as a magic value for Ephemeral streams, it should be fine.
    Note that the placeholder item could be reused to store an exception in case we want to completely decouple stream listening from executions.

@bojidar-bg bojidar-bg added this to the Release 0.8 milestone Nov 18, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants