Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented daemonMask and nonDaemonMask #1

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions core-tests/shared/src/test/scala/zio/ZIOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,140 @@ object ZIOSpec extends ZIOBaseSpec {
fibs <- ZIO.children
} yield assert(fibs, isEmpty)
},
testM("daemon fiber race interruption") {
def plus1(ref: Ref[Int], latch: Promise[Nothing, Unit]) =
latch.succeed(()) *> ZIO.never *> ref.update(_ + 1)
def interruptHandler(ref: Ref[Int]) =
ref.update(_ + 1)

val io = for {
ref <- Ref.make(0)
interruptionRef <- Ref.make(0)
latch1Start <- Promise.make[Nothing, Unit]
latch2Start <- Promise.make[Nothing, Unit]
fiber <- plus1(ref, latch1Start)
.onInterrupt(interruptHandler(interruptionRef))
.race(plus1(ref, latch2Start).onInterrupt(interruptHandler(interruptionRef)))
.fork
_ <- latch1Start.await
_ <- latch2Start.await
_ <- fiber.interrupt
res <- ref.get
interrupted <- interruptionRef.get
} yield assert(interrupted, equalTo(2)) && assert(res, equalTo(0))

io.daemon
},
testM("daemon mask") {
def forkAwait =
for {
latch <- Promise.make[Nothing, Unit]
latchEnd <- Promise.make[Nothing, Unit]
_ <- latchEnd.await.fork *> latch.succeed(())
} yield (latch, latchEnd)

def handleLatch(latches: (Promise[Nothing, Unit], Promise[Nothing, Unit])) =
latches._1.await.as(latches._2)

val io = for {
latches1 <- forkAwait
(l1, l1End) = latches1
_ <- l1.await
children1 <- ZIO.children
latchEnds <- ZIO.daemonMask { restore =>
for {
latches1 <- ZIO.sequence(
List(
forkAwait.flatMap(handleLatch),
forkAwait.flatMap(handleLatch),
forkAwait.flatMap(handleLatch)
)
)
latches2 <- restore(
ZIO.sequence(
List(
forkAwait.flatMap(handleLatch),
forkAwait.flatMap(handleLatch)
)
)
)
} yield latches1 ++ latches2
}
children2 <- ZIO.children
_ <- l1End.succeed(())
_ <- ZIO.traverse_(latchEnds)(_.succeed(()))
} yield assert(children1.size, equalTo(1)) && assert(children2.size, equalTo(3))

io.nonDaemon
},
testM("nonDaemon mask") {
def forkAwait =
for {
latch <- Promise.make[Nothing, Unit]
latchEnd <- Promise.make[Nothing, Unit]
_ <- latchEnd.await.fork *> latch.succeed(())
} yield (latch, latchEnd)

def handleLatch(latches: (Promise[Nothing, Unit], Promise[Nothing, Unit])) =
latches._1.await.as(latches._2)

val io =
for {
latches <- forkAwait
(l1, l1End) = latches
_ <- l1.await
children1 <- ZIO.children
childrenWithLatches <- ZIO.nonDaemonMask { restore =>
for {
latches1 <- ZIO.sequence(
List(
forkAwait.flatMap(handleLatch),
forkAwait.flatMap(handleLatch),
forkAwait.flatMap(handleLatch)
)
)
latches2 <- restore(
ZIO.sequence(
List(
forkAwait.flatMap(handleLatch),
forkAwait.flatMap(handleLatch)
)
)
)
children2 <- ZIO.children
} yield (children2, latches1 ++ latches2)
}
(children2, latchEnds) = childrenWithLatches
latches2 <- forkAwait
(l2, l2End) = latches2
_ <- l2.await
children3 <- ZIO.children
_ <- l1End.succeed(())
_ <- l2End.succeed(())
_ <- ZIO.traverse_(latchEnds)(_.succeed(()))
} yield assert(children1.size, equalTo(0)) && assert(children2.size, equalTo(3)) && assert(
children3.size,
equalTo(3)
)

io.daemon
},
testM("race in daemon is executed") {
for {
latch1 <- Promise.make[Nothing, Unit]
latch2 <- Promise.make[Nothing, Unit]
p1 <- Promise.make[Nothing, Unit]
p2 <- Promise.make[Nothing, Unit]
loser1 = ZIO.bracket(latch1.succeed(()))(_ => p1.succeed(()))(_ => ZIO.never)
loser2 = ZIO.bracket(latch2.succeed(()))(_ => p2.succeed(()))(_ => ZIO.never)
fiber <- (loser1 race loser2).fork.daemon
_ <- latch1.await
_ <- latch2.await
_ <- fiber.interrupt
res1 <- p1.await
res2 <- p2.await
} yield assert(res1, isUnit) && assert(res2, isUnit)
},
testM("supervise fibers") {
def makeChild(n: Int): URIO[Clock, Fiber[Nothing, Unit]] =
(clock.sleep(20.millis * n.toDouble) *> IO.never).fork
Expand Down
12 changes: 12 additions & 0 deletions core/shared/src/main/scala/zio/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ object IO {
final def collectAllWithParN[E, A, B](n: Int)(as: Iterable[IO[E, A]])(f: PartialFunction[A, B]): IO[E, List[B]] =
ZIO.collectAllWithParN(n)(as)(f)

/**
* @see See [[zio.ZIO.daemonMask]]
*/
final def daemonMask[E, A](k: ZIO.DaemonStatusRestore => IO[E, A]): IO[E, A] =
ZIO.daemonMask(k)

/**
* @see See [[zio.ZIO.descriptor]]
*/
Expand Down Expand Up @@ -409,6 +415,12 @@ object IO {
*/
final val never: UIO[Nothing] = ZIO.never

/**
* @see See [[zio.ZIO.nonDaemonMask]]
*/
final def nonDaemonMask[E, A](k: ZIO.DaemonStatusRestore => IO[E, A]): IO[E, A] =
ZIO.nonDaemonMask(k)

/**
* @see See [[zio.ZIO.none]]
*/
Expand Down
12 changes: 12 additions & 0 deletions core/shared/src/main/scala/zio/RIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ object RIO {
final def collectAllWithParN[R, A, B](n: Int)(as: Iterable[RIO[R, A]])(f: PartialFunction[A, B]): RIO[R, List[B]] =
ZIO.collectAllWithParN(n)(as)(f)

/**
* @see See [[zio.ZIO.daemonMask]]
*/
final def daemonMask[R, A](k: ZIO.DaemonStatusRestore => RIO[R, A]): RIO[R, A] =
ZIO.daemonMask(k)

/**
* @see See [[zio.ZIO.descriptor]]
*/
Expand Down Expand Up @@ -432,6 +438,12 @@ object RIO {
*/
final val never: UIO[Nothing] = ZIO.never

/**
* @see See [[zio.ZIO.nonDaemonMask]]
*/
final def nonDaemonMask[R, A](k: ZIO.DaemonStatusRestore => RIO[R, A]): RIO[R, A] =
ZIO.nonDaemonMask(k)

/**
* @see See [[zio.ZIO.none]]
*/
Expand Down
12 changes: 12 additions & 0 deletions core/shared/src/main/scala/zio/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ object Task {
final def collectAllWithParN[A, B](n: Int)(as: Iterable[Task[A]])(f: PartialFunction[A, B]): Task[List[B]] =
ZIO.collectAllWithParN(n)(as)(f)

/**
* @see See [[zio.ZIO.daemonMask]]
*/
final def daemonMask[A](k: ZIO.DaemonStatusRestore => Task[A]): Task[A] =
ZIO.daemonMask(k)

/**
* @see See [[zio.ZIO.die]]
*/
Expand Down Expand Up @@ -405,6 +411,12 @@ object Task {
*/
final val never: UIO[Nothing] = ZIO.never

/**
* @see See [[zio.ZIO.nonDaemonMask]]
*/
final def nonDaemonMask[A](k: ZIO.DaemonStatusRestore => Task[A]): Task[A] =
ZIO.nonDaemonMask(k)

/**
* @see See [[zio.ZIO.none]]
*/
Expand Down
12 changes: 12 additions & 0 deletions core/shared/src/main/scala/zio/UIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ object UIO {
final def collectAllWithParN[A, B](n: Int)(as: Iterable[UIO[A]])(f: PartialFunction[A, B]): UIO[List[B]] =
ZIO.collectAllWithParN(n)(as)(f)

/**
* @see See [[zio.ZIO.daemonMask]]
*/
final def daemonMask[A](k: ZIO.DaemonStatusRestore => UIO[A]): UIO[A] =
ZIO.daemonMask(k)

/**
* @see See [[zio.ZIO.descriptor]]
*/
Expand Down Expand Up @@ -356,6 +362,12 @@ object UIO {
*/
final val none: UIO[Option[Nothing]] = ZIO.none

/**
* @see See [[zio.ZIO.nonDaemonMask]]
*/
final def nonDaemonMask[A](k: ZIO.DaemonStatusRestore => UIO[A]): UIO[A] =
ZIO.nonDaemonMask(k)

/**
* @see See [[zio.ZIO.never]]
*/
Expand Down
12 changes: 12 additions & 0 deletions core/shared/src/main/scala/zio/URIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ object URIO {
final def collectAllWithParN[R, A, B](n: Int)(as: Iterable[URIO[R, A]])(f: PartialFunction[A, B]): URIO[R, List[B]] =
ZIO.collectAllWithParN(n)(as)(f)

/**
* @see See [[zio.ZIO.daemonMask]]
*/
final def daemonMask[R, A](k: ZIO.DaemonStatusRestore => URIO[R, A]): URIO[R, A] =
ZIO.daemonMask(k)

/**
* @see [[zio.ZIO.descriptor]]
*/
Expand Down Expand Up @@ -384,6 +390,12 @@ object URIO {
*/
final val never: UIO[Nothing] = ZIO.never

/**
* @see See [[zio.ZIO.nonDaemonMask]]
*/
final def nonDaemonMask[R, A](k: ZIO.DaemonStatusRestore => URIO[R, A]): URIO[R, A] =
ZIO.nonDaemonMask(k)

/**
* @see [[zio.ZIO.none]]
*/
Expand Down
35 changes: 29 additions & 6 deletions core/shared/src/main/scala/zio/ZIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1001,12 +1001,14 @@ sealed trait ZIO[-R, +E, +A] extends Serializable { self =>
leftDone: (Exit[E, A], Fiber[E1, B]) => ZIO[R1, E2, C],
rightDone: (Exit[E1, B], Fiber[E, A]) => ZIO[R1, E2, C]
): ZIO[R1, E2, C] =
new ZIO.RaceWith[R1, E, E1, E2, A, B, C](
self,
that,
leftDone,
rightDone
)
ZIO.nonDaemonMask { restore =>
new ZIO.RaceWith[R1, E, E1, E2, A, B, C](
self,
that,
(exit, fiber) => restore(leftDone(exit, fiber)),
(exit, fiber) => restore(rightDone(exit, fiber))
)
}

/**
* Attach a wrapping trace pointing to this location in case of error.
Expand Down Expand Up @@ -1852,6 +1854,14 @@ private[zio] trait ZIOFunctions extends Serializable {
)(f: PartialFunction[A, U]): ZIO[R, E, List[U]] =
ZIO.collectAllParN(n)(in).map(_.collect(f))

/**
* Makes the effect daemon, but passes it a restore function that
* can be used to restore the inherited daemon status from whatever region
* the effect is composed into.
*/
final def daemonMask[R, E, A](k: ZIO.DaemonStatusRestore => ZIO[R, E, A]): ZIO[R, E, A] =
checkDaemon(status => k(new ZIO.DaemonStatusRestore(status)).daemon)

/**
* Returns information about the current fiber, such as its identity.
*/
Expand Down Expand Up @@ -2332,6 +2342,14 @@ private[zio] trait ZIOFunctions extends Serializable {
)(zero: B)(f: (B, A) => B): ZIO[R, E, B] =
in.foldLeft[ZIO[R, E, B]](succeed[B](zero))((acc, a) => acc.zipPar(a).map(f.tupled)).refailWithTrace

/**
* Makes the effect non-daemon, but passes it a restore function that
* can be used to restore the inherited daemon status from whatever region
* the effect is composed into.
*/
final def nonDaemonMask[R, E, A](k: ZIO.DaemonStatusRestore => ZIO[R, E, A]): ZIO[R, E, A] =
checkDaemon(status => k(new ZIO.DaemonStatusRestore(status)).nonDaemon)

/**
* Returns an effect with the empty value.
*/
Expand Down Expand Up @@ -2624,6 +2642,11 @@ object ZIO extends ZIOFunctions {
zio.interruptStatus(flag)
}

final class DaemonStatusRestore(private val status: zio.DaemonStatus) extends AnyVal {
def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] =
zio.daemonStatus(status)
}

final class TimeoutTo[R, E, A, B](self: ZIO[R, E, A], b: B) {
def apply[B1 >: B](f: A => B1)(duration: Duration): ZIO[R with Clock, E, B1] =
self
Expand Down