How to access OfferAsync method of Source.Queue when using Akka.Net Graphs? #6928
-
I am working on an Akka.net Streams application that uses Graph API. I would like to provide source data via OfferAsync method of Source.Queue. How do I access ISourceQueueWithComplete.OfferAsync method after creating a graph to add data to the stream? Here is a code that I am using to create a graph: // Create a graph
var runnableGraph = RunnableGraph.FromGraph(GraphDsl.Create(
builder =>
{
// create source
var sourceQueue = Source.Queue<int>(100, OverflowStrategy.Fail)
// use builder to configure the graph
// as shown here: https://getakka.net/articles/streams/workingwithgraphs.html
...
}
// run the graph
runnableGraph.Run(materializer); Here is the code that I want to use to get data for the stream:
|
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
You just need to pass the var actorSystem = ActorSystem.Create("Test");
// create source
var sourceQueue = Source.Queue<int>(100, OverflowStrategy.Fail);
var graph = GraphDsl.Create(sourceQueue, (builder, source) =>
{
// connected shapes
var flow = builder.Add(Flow.Create<int>().Select(i => i * 10));
var sink = builder.Add(Sink.ForEach<int>(i => Console.WriteLine(i)));
builder.From(source).To(flow);
builder.From(flow).To(sink);
return ClosedShape.Instance;
});
ISourceQueueWithComplete<int> queueSource = actorSystem.Materializer().Materialize(graph);
foreach(var i in Enumerable.Range(0, 10)){
await queueSource.OfferAsync(i);
} Executing this program will result in:
|
Beta Was this translation helpful? Give feedback.
You just need to pass the
Source.Queue<T>
in as an input variable when you initially call theGraphDsl
- this will allow the graph to have a materialization type, which you can access once the graph is compiled: