Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: Reduce boilerplate when building stream graph #44

Open
branimirangelov opened this issue Oct 4, 2020 · 1 comment
Open

API: Reduce boilerplate when building stream graph #44

branimirangelov opened this issue Oct 4, 2020 · 1 comment
Labels
enhancement New feature or request question Further information is requested
Milestone

Comments

@branimirangelov
Copy link
Collaborator

Azure Functions Extensions collects all of the metadata of the functions (function.json / attributes) that can be passed to stream context to reduce boilerplate when building stream graph.

@bojidar-bg
Copy link
Collaborator

bojidar-bg commented Nov 18, 2021

We no longer use Azure Functions in 0.7, hence the approach in the OP won't work. However, in 0.8 there is still boilerplate remaining, as the caller is currently responsible for configuring the stream to match the callee's output.

Example (from samples/dotnet/BasicSample):

var generator = await PerperContext.Stream("Generator").Packed().Persistent().StartAsync(messageCount).ConfigureAwait(false);
var processor = await PerperContext.Stream("Processor").StartAsync(generator.Replay(), batchCount).ConfigureAwait(false);

public async IAsyncEnumerable<(long, string)> RunAsync(int count)

Ideally we would want the caller (Init.cs) to not have to configure Packed since the callee (Generator.cs) cannot be used as an non-Packed stream -- it writes items out-of-order, and forgetting Packed would cause listeners to skip items.
Likewise, we don't want to have to configure Persistent and Replay either.

My proposal would be to make streams (e.g. Generator) be accessed as PerperStream-returning Call directly, while managing issues such as executions and cancellation behind the scenes. Unfortunately, this would require either attributes or an additional method for configuring the stream itself, and it would not work with cyclic streams.

API Proposals
// Usage site:

var generator = PerperContext.CallAsync<PerperStream>("Generator", 10);
var processor = PerperContext.CallAsync<PerperStream>("Processor", generator);

// 1. Definition site w/ Attributes:
class Generator {
    [PeperPersistentStream] [PeperReplayStream] [PerperIndexStream(typeof(X))]
    public IAsyncEnumerable<(long, X)> RunAsync(int x) { ... }
}
class GeneratorAgent {
    [PeperPersistentStream] [PeperReplayStream] [PerperIndexStream(typeof(X))]
    public IAsyncEnumerable<(long, X)> GeneratorAsync(int x) { ... }
}
// 2. Definition site w/ Additional method or property:
class Generator {
    public PerperStreamOptions StreamOptions => new StreamOptions().Persistent().Index(typeof(X)).Replay();
    public IAsyncEnumerable<(long, X)> RunAsync(int x) { ... }
}
class GeneratorAgent {
    public PerperStreamOptions GeneratorStreamOptions => new StreamOptions().Persistent().Index(typeof(X)).Replay();
    public IAsyncEnumerable<(long, X)> GeneratorAsync(int x) { ... }
}
// 3. Definition site w/ Additional utility method:
class Generator {
    public async Task<PerperStream> RunAsync(int x) =>
        (await new StreamBuilder().Persistent().Index(typeof(X)).MakeStream(StreamAsync(x))).Replay();
    private IAsyncEnumerable<(long, X)> StreamAsync(int x) { ... }
}
class GeneratorAgent {
    public async Task<PerperStream> GeneratorAsync(int x) =>
        (await new StreamBuilder().Persistent().Index(typeof(X)).MakeStream(StreamAsync(x))).Replay();
    private IAsyncEnumerable<(long, X)> StreamAsync(int x) { ... }
}

Additional considerations:

  • Replaying the stream is a lesser issue, but we would still enjoy being able to do so.
  • In (1) and (2)+properties, there is no way for the caller to specify whether the resulting stream should be persisted/indexed/etc., even if the definition site cooperates.
  • In (3), care must be taken if the utility method is not idempotent, as we need a way be able to restart execution of the stream after a crash.

Retagging to 0.8, though it is possible it might have to wait for 0.9 if there is no suitable API proposal.

@bojidar-bg bojidar-bg modified the milestones: Release 0.6, Release 0.8 Nov 18, 2021
@bojidar-bg bojidar-bg added the question Further information is requested label Nov 18, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants