Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Add Pipeline conversions and rewrite internals with channels #373

Open
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

mschuwalow
Copy link

@mschuwalow mschuwalow commented Oct 11, 2023

Reason to go this way is that for pipeline to work correctly, we need to ensure that the Publisher and the Subscriber run concurrently on different fibers. Imo this is easiest / cleanest by relying on the AsynInput abstractions baked into the channelexecutor.

resolves #364

()
}
}
)(implicit trace: Trace): ZIO[R, Nothing, Publisher[O]] = ZIO.runtime[R].map { runtime => subscriber =>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This guy cannot be implemented on top of channelToPublisher due to the missing R <: Scope constraint

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you feel about just adding the constraint? It would be more correct if the fork was scoped IMO.

unsafe { implicit unsafe =>
val subscription = new SubscriptionProducer[I](subscriber)

def reader: ZChannel[Any, ZNothing, Chunk[I], Any, Nothing, Chunk[I], Unit] = ZChannel.readWithCause(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs a custom implementation because we need to prevent defects from getting send to it

)(implicit trace: Trace): ZPipeline[Any, Throwable, I, O] = ZPipeline.unwrapScoped {
val subscription = new SubscriptionProducer[I](processor)(unsafe)
val subscriber = new SubscriberConsumer[O](bufferSize)(unsafe)
val passthrough = new PassthroughAsyncInput(subscription, subscriber)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs a custom implementation so defects are not passed

@mschuwalow mschuwalow marked this pull request as ready for review October 16, 2023 16:11
@kciesielski
Copy link

Bumping this, as it would really help to implement WebSocket support for tapir-netty-zio, where we use netty-reactive-streams underneath and need a Processor[Req, Resp] created out of a ZPipeline. Any chance to push this a little bit forward, @runtologist?

@dispalt
Copy link

dispalt commented May 2, 2024

This PR also makes cancellations of streams work better, previously cancellations only seemed to complete the stream if read after closing the stream.

Copy link
Collaborator

@runtologist runtologist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this awesome contribution @mschuwalow, and sorry it took so loong before I noticed.

Just a few tiny comments.

()
}
}
)(implicit trace: Trace): ZIO[R, Nothing, Publisher[O]] = ZIO.runtime[R].map { runtime => subscriber =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you feel about just adding the constraint? It would be more correct if the fork was scoped IMO.

if (!isSubscribedOrCanceled.compareAndSet(false, true)) {
s.cancel()
} else {
subscription.unsafe.done(ZIO.succeedNow(s))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

succeedNow is deprecated. Let's switch to succeed.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add ZPipeline to Processor conversion
4 participants