Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Add ZPipeline to Processor conversion #364

Open
pragmaxim opened this issue Jul 9, 2023 · 3 comments · May be fixed by #373
Open

Add ZPipeline to Processor conversion #364

pragmaxim opened this issue Jul 9, 2023 · 3 comments · May be fixed by #373

Comments

@pragmaxim
Copy link

org.reactivestreams.Processor is an equivalent of ZPipeline or akka.stream.scaladsl.Flow and when integrating between different reactive "implementation", it is possible to create it using org.reactivestreams.FlowAdapters.toFlowProcessor ... Akka-stream has flow.toProcessor method but I can't find a way how to turn ZPipeline into a Processor.

Any idea?

@runtologist
Copy link
Collaborator

Not tested, bu shouldn't this work?

  def processorToZPipeline[In, Out](processor: Processor[In, Out]): ZPipeline[Scope, Throwable, In, Out] =
    ZPipeline.unwrap(
      for {
        signalErrorSink    <- processor.toZIOSink
        (signalError, sink) = signalErrorSink
        stream              = processor.toZIOStream()
        sinkPipe            = ZPipeline.fromSink(sink)
        sourcePipe          = ZPipeline.fromChannel(stream.channel)
      } yield sinkPipe >>> sourcePipe
    )

@runtologist
Copy link
Collaborator

Well, it doesn't.

@mschuwalow
Copy link

@runtologist are you looking at this one?
If not -- I just hit a usecase for this at work, so I can take care of it.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants