Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

add zioToPublisher conversion #326

Draft
wants to merge 9 commits into
base: series/2.x
Choose a base branch
from

Conversation

swachter
Copy link

This PR adds the conversion zioToPublisher.

@swachter swachter requested a review from runtologist as a code owner May 19, 2022 19:08
@swachter swachter marked this pull request as draft May 20, 2022 05:56
@runtologist
Copy link
Collaborator

This is awesome. Thanks for this contribution! Just one small question.

@swachter
Copy link
Author

Thanks for the positive feedback. I consider that PR to be in draft. I would like to implement subscription cancellation using fiber interruption. However, something does not yet fit. A corresponding unit test does not terminate and I struggle with debugging that situation.

@runtologist
Copy link
Collaborator

@swachter OK. Let me know if you'd like me to take a look.

@runtologist
Copy link
Collaborator

@swachter when I do this change:

                  o <- zio
                          .tapError(e => ZIO.succeed(subscriber.onError(e)))
                          .onInterrupt(
                            ZIO.succeed(subscriber.onError(new RuntimeException("Fiber interrupted.")))
                          ) <& subscription.offer(1)

the following test succeeds:

    test("interrupts subscription on fiber interruption") {
        for {
          promise     <- Promise.make[Nothing, Unit]
          publisher   <- promise.await.toPublisher
          resultFiber <- publisher.toZIOStream().runDrain.exit.fork
          _           <- promise.interrupt
          result      <- resultFiber.join
        } yield assert(result)(fails(anything))
      } @@ TestAspect.timeout(1.second)

Is that what you had in mind?

Note that Subscription.cancel is an internal method and not part of the reactive streams protocol. It will not have an effect on a regular Subscriber.

@StefanWachter1507
Copy link

StefanWachter1507 commented May 30, 2022

I pushed my current implementation, altough it is not yet working. The main idea is in line Adapters.scala#L414 where subscription cancellation is implemented by fiber interruption.

There seems to be some issue with fiber interruption, though. The following test case does not work:

      test("interrupts evaluation on cancellation") {
        for {
          interruptedPromise  <- Promise.make[Nothing, Unit].debug("p1")
          subscriptionPromise <- Promise.make[Nothing, Subscription].debug("p2")
          z                    = ZIO.never.as(0).onInterrupt(interruptedPromise.complete(ZIO.succeed(()).debug("p33"))).debug("p3")
          publisher           <- z.toPublisher.debug("p4")
          subscriber = new Subscriber[Int] {
                         override def onSubscribe(s: Subscription): Unit =
                           subscriptionPromise.unsafeDone(ZIO.succeed(s))
                         override def onNext(t: Int): Unit        = ???
                         override def onError(t: Throwable): Unit = ???
                         override def onComplete(): Unit          = ???
                       }
          _            <- ZIO.succeed(publisher.subscribe(subscriber)).debug("p5")
          subscription <- subscriptionPromise.await.debug("p6")
          _            <- ZIO.succeed(subscription.cancel()).debug("p7")
          unit         <- interruptedPromise.await.debug("p8")
        } yield {
          assert(unit)(equalTo(()))
        }
      }

@StefanWachter1507
Copy link

StefanWachter1507 commented May 30, 2022

Note that Subscription.cancel is an internal method and not part of the reactive streams protocol. It will not have an effect on a regular Subscriber.

AFAIS Subsciption.cancel is part of the API (cf. Subscription.cancel()).

Fiber interruption on subscription cancellation would be the main benefit of the new implementation. It would interrupt ongoing calculations if their result is no more of interest.

@swachter
Copy link
Author

swachter commented Jun 1, 2022

I think I found a bug in ZIO 2.0.0-RC6 (cf. ZIO#6888). That bug prevents that interruption works as expected.

I changed the implementation slightly by replacing the use of <& by <*. This fixed the interruption issue.

The PR should be ready for review now.

@swachter swachter marked this pull request as ready for review June 1, 2022 06:43
@runtologist
Copy link
Collaborator

Oh, now I understand what you want to achieve. I assumed you wanted to signal cancellation to the subscriber when the original ZIO is cancelled.

@runtologist
Copy link
Collaborator

I would like to propose a slightly different implementation, which uses Runtime.unsafeRunAsyncCancel and is more compact. It behaves slightly differently: The ZIO is not run on subscribe, but on request, When there is no demand, no work is ever performed. For that reason, the tests need slight adjustments with this version. It could be adapted to run the ZIO on subscribe as well.

  def zioToPublisher[R, E <: Throwable, O](
    zio: => ZIO[R, E, O]
  )(implicit trace: Trace): URIO[R, Publisher[O]] =
    ZIO.runtime.map { runtime => subscriber =>
      if (subscriber == null) {
        throw new NullPointerException("Subscriber must not be null.")
      } else {
        val cancellationHookRef = new AtomicReference[FiberId => Exit[Any, Unit]]

        val subscription = new Subscription {

          override def cancel(): Unit = {
            val hook = cancellationHookRef.compareAndExchange(null, _ => Exit.interrupt(FiberId.None))
            if (hook != null) {
              hook(FiberId.None)
              ()
            }
          }

          override def request(n: Long): Unit =
            if (n <= 0) throw new RuntimeException("demand must be > 0")
            else {
              if (cancellationHookRef.get == null) {
                val latch = Promise.unsafeMake[Unit, Unit](FiberId.None)
                val hook =
                  runtime.unsafeRunAsyncCancelable(
                    latch.await *>
                      zio
                        .foldZIO(
                          e => ZIO.succeed(subscriber.onError(e)),
                          a => ZIO.succeed(subscriber.onNext(a)) *> ZIO.succeed(subscriber.onComplete())
                        )
                  )(_ => ())
                if (cancellationHookRef.compareAndSet(null, hook)) latch.unsafeDone(ZIO.succeedNow(()))
                else latch.unsafeDone(ZIO.fail(()))
              }
            }

        }

        subscriber.onSubscribe(subscription)
      }
    }

WDYT?

@swachter
Copy link
Author

swachter commented Jun 1, 2022

I pushed your alternative implementation of zioToPushlisher and an alternative implementation of streamToPublisher to have a unified view of them.

Some thoughts:

  • I think the new implementation is more performant because it relies on more direct mechanisms.
  • The change in behavior that it starts evaluating the supplied zio not before some demand was signaled seems no clear improvement to me. I think the common case is that demand gets signaled. In that case it may be better if the evaluation of the zio is already under way.
  • I like my implementation because the implementation of subscribeAndRun reads very naturally.
  • The subscribeAndRun method could also be used to implement streamToPublisher (see added alternative). Thereby streamToPublisher would also benefit from interruption while it is preparing the next chunk. AFAIS the current implementation of streamToPublisher reacts on cancellation not before it offers the next number of elements.
  • I have the gut feeling that the implementation of subscriberToSink could also be fitted into the subscribeAndRun "framework". Yet, I still have to get my head around that idea.

I think I need more time to digest these things.

@swachter swachter marked this pull request as draft June 5, 2022 21:04
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants