Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Add a channel that outputs to a subscriber. #360

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

quelgar
Copy link

@quelgar quelgar commented Apr 21, 2023

Proposal to replace the current subscriber.toZIOSink with a channel. The advantage of a channel is that it is notified of upstream errors and can propagate them to the subscriber's onError method. While a channel is generally not as convenient to use as a sink, I think it is more convenient than the current sink + error handler pairing, as

  • the caller no longer needs to manually install the error handler
  • there's no need for a Scope to be exposed to the caller

So this:

val asSink = subscriber.toZIOSink[Throwable]
val failingStream = ZStream.range(3, 13) ++ ZStream.fail(new RuntimeException("boom!"))
ZIO.scoped {
  asSink.flatMap { case (signalError, sink) => // FIXME
    failingStream.run(sink).catchAll(signalError)
  }
}

becomes:

val subscriberChannel = ZChannel.toSubscriber(subscriber)
val failingStream = ZStream.range(3, 13) ++ ZStream.fail(new RuntimeException("boom!"))
failingStream.pipeThroughChannel(subscriberChannel).runDrain

For the API, I've followed the ZChannel convention instead of the existing style. For ZChannel the way "to" and "from" are used is different. For example, there's ZSink.fromQueue which means "create a sink from this queue", whereas the channel equivalent is ZChannel.toQueue which means "create a channel that outputs to this queue". We could definitely do subscriber.toZIOChannel instead or in addition though.

Edit: I just changed it to propagate upstream errors in addition to signalling onError to the subscriber. Seems desirable? So the output error is Some(throwable) for an upstream error and None for cancellation by the subscriber.

@quelgar quelgar requested a review from runtologist as a code owner April 21, 2023 00:01
Using a channel to drive a subscriber instead of a sink has the
advantage that we can pass upstream errors through to the
subscriber's `onError` method without needing the caller to
manually install an error handler.

While a channel is not as convenient to use as a sink, it is more
convenient that the error handler + sink pair.
@quelgar quelgar force-pushed the channel-to-subscriber branch from c2f1a9c to 83e0f63 Compare April 21, 2023 00:35
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant