Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove parallel errors from ZPure #1432

Merged
merged 5 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions core-tests/shared/src/test/scala/zio/prelude/fx/ZPureSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ object ZPureSpec extends ZIOBaseSpec {
},
test("providing environment should preserve errors") {
val zPure: ZPure[Nothing, Unit, Unit, (Int, Int), Int, Int] =
ZPure.fail(1).zipPar(ZPure.fail(2)).as(0)
ZPure.fail(1).as(0)
val actual = zPure.provideEnvironment(ZEnvironment((1, 2))).runValidation
val expected = Validation.Failure(Chunk.empty, NonEmptyChunk(1, 2))
val expected = Validation.Failure(Chunk.empty, NonEmptyChunk(1))
assert(actual)(equalTo(expected))
},
test("provideSome") {
Expand Down Expand Up @@ -821,20 +821,6 @@ object ZPureSpec extends ZIOBaseSpec {
}
)
),
test("parallel errors example") {
def validateName(s: String): ZPure[Nothing, Unit, Unit, Any, String, String] =
if (s == "John Doe") ZPure.succeed(s) else ZPure.fail("Wrong name!")
def validateAge(age: Int): ZPure[Nothing, Unit, Unit, Any, String, Int] =
if (age >= 18) ZPure.succeed(age) else ZPure.fail("Under age")
def validateAuthorized(authorized: Boolean): ZPure[Nothing, Unit, Unit, Any, String, Unit] =
if (authorized) ZPure.unit else ZPure.fail("Not authorized")
val validation =
validateName("Jane Doe") zipPar validateAge(17) zipPar validateAuthorized(false)
val result = validation.sandbox.either.run
assert(result)(
isLeft(equalTo(Cause("Wrong name!") && Cause("Under age") && Cause("Not authorized")))
)
},
test("state is restored after failure") {
val foo: ZPure[Nothing, String, Int, Any, Nothing, Unit] = ZPure.set(3)
val bar: ZPure[Nothing, Int, String, Any, Nothing, Unit] = ZPure.set("bar")
Expand Down
126 changes: 23 additions & 103 deletions core/shared/src/main/scala/zio/prelude/fx/ZPure.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package zio.prelude.fx

import zio._
import zio.prelude._
import zio.prelude.coherent.CovariantIdentityBoth
import zio.{Cause => _, _}

import java.util.concurrent.atomic.AtomicBoolean
import scala.reflect.ClassTag
Expand All @@ -36,36 +36,12 @@ import scala.util.Try
sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
import ZPure._

/**
* A symbolic alias for `zipParRight`.
*/
final def &>[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](
that: ZPure[W1, S3, S3, R1, E1, B]
): ZPure[W1, S3, S3, R1, E1, B] =
self zipParRight that

/**
* A symbolic alias for `zipRight`.
*/
final def *>[W1 >: W, S3, R1 <: R, E1 >: E, B](that: ZPure[W1, S2, S3, R1, E1, B]): ZPure[W1, S1, S3, R1, E1, B] =
self zipRight that

/**
* A symbolic alias for `zipParLeft`.
*/
final def <&[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](
that: ZPure[W1, S3, S3, R1, E1, B]
): ZPure[W1, S3, S3, R1, E1, A] =
self zipParLeft that

/**
* A symbolic alias for `zipPar`.
*/
final def <&>[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](that: ZPure[W1, S3, S3, R1, E1, B])(implicit
zippable: Zippable[A, B]
): ZPure[W1, S3, S3, R1, E1, zippable.Out] =
self zipPar that

/**
* A symbolic alias for `zipLeft`.
*/
Expand Down Expand Up @@ -263,12 +239,6 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
): ZPure[W, S3, S3, R, Nothing, B] =
self.foldM(e => ZPure.succeed(failure(e)), a => ZPure.succeed(success(a)))

final def foldCauseM[W1 >: W, S0 <: S1, S3, R1 <: R, E1, B](
failure: Cause[E] => ZPure[W1, S0, S3, R1, E1, B],
success: A => ZPure[W1, S2, S3, R1, E1, B]
)(implicit ev: CanFail[E]): ZPure[W1, S0, S3, R1, E1, B] =
Fold(self, failure, success)

/**
* Recovers from errors by accepting one computation to execute for the case
* of an error, and one computation to execute for the case of success.
Expand All @@ -277,7 +247,7 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
failure: E => ZPure[W1, S0, S3, R1, E1, B],
success: A => ZPure[W1, S2, S3, R1, E1, B]
)(implicit ev: CanFail[E]): ZPure[W1, S0, S3, R1, E1, B] =
foldCauseM((cause: Cause[E]) => failure(cause.first), success)
Fold(self, failure, success)

/**
* Exposes the output state into the value channel.
Expand Down Expand Up @@ -361,14 +331,6 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
final def mapError[E1](f: E => E1)(implicit ev: CanFail[E]): ZPure[W, S1, S2, R, E1, A] =
catchAll(e => fail(f(e)))

/**
* Returns a computation with its full cause of failure mapped using the
* specified function. This can be users to transform errors while
* preserving the original structure of the `Cause`.
*/
final def mapErrorCause[E2](f: Cause[E] => Cause[E2]): ZPure[W, S1, S2, R, E2, A] =
foldCauseM(cause => ZPure.failCause(f(cause)), ZPure.succeed)

/**
* Transforms the updated state of this computation with the specified
* function.
Expand Down Expand Up @@ -578,29 +540,29 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
* the updated state and the result.
*/
final def run(s: S1)(implicit ev1: Any <:< R, ev2: E <:< Nothing): (S2, A) =
runAll(s)._2.fold(cause => ev2(cause.first), identity)
runAll(s)._2.fold(ev2, identity)

/**
* Runs this computation with the specified initial state, returning both the
* log and either all the failures that occurred or the updated state and the
* result.
*/
final def runAll(s: S1)(implicit ev: Any <:< R): (Chunk[W], Either[Cause[E], (S2, A)]) =
final def runAll(s: S1)(implicit ev: Any <:< R): (Chunk[W], Either[E, (S2, A)]) =
Runner(s, self)

/**
* Runs this computation to produce its result or the first failure to
* occur.
*/
final def runEither(implicit ev1: Unit <:< S1, ev2: Any <:< R): Either[E, A] =
runAll(())._2.fold(cause => Left(cause.first), { case (_, a) => Right(a) })
runAll(())._2.fold(error => Left(error), { case (_, a) => Right(a) })

/**
* Runs this computation to produce its result and the log.
*/
final def runLog(implicit ev1: Unit <:< S1, ev2: Any <:< R, ev3: E <:< Nothing): (Chunk[W], A) = {
val (log, either) = runAll(())
(log, either.fold(cause => ev3(cause.first), { case (_, a) => a }))
(log, either.fold(error => ev3(error), { case (_, a) => a }))
}

/**
Expand All @@ -622,16 +584,10 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
*/
final def runValidation(implicit ev1: Unit <:< S1, ev2: Any <:< R): ZValidation[W, E, A] =
runAll(()) match {
case (log, Left(cause)) => ZValidation.Failure(log, NonEmptyChunk.fromChunk(cause.toChunk).get)
case (log, Left(error)) => ZValidation.Failure(log, NonEmptyChunk.single(error))
case (log, Right((_, a))) => ZValidation.Success(log, a)
}

/**
* Exposes the full cause of failures of this computation.
*/
final def sandbox: ZPure[W, S1, S2, R, Cause[E], A] =
foldCauseM(ZPure.fail, ZPure.succeed)

/**
* Converts an option on values into an option on errors leaving the state unchanged.
*/
Expand Down Expand Up @@ -684,7 +640,7 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
def toZIO(implicit ev: Unit <:< S1): zio.ZIO[R, E, A] =
ZIO.environmentWithZIO[R] { r =>
provideEnvironment(r).runAll(())._2 match {
case Left(cause) => ZIO.failCause(cause.toCause)
case Left(error) => ZIO.fail(error)
case Right((_, a)) => ZIO.succeed(a)
}
}
Expand All @@ -696,7 +652,7 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
ZIO.environmentWithZIO[R] { r =>
val result = provideEnvironment(r).runAll(s1)
result._2 match {
case Left(cause) => ZIO.failCause(cause.toCause)
case Left(error) => ZIO.fail(error)
case Right((_, a)) => ZIO.succeed(a)
}
}
Expand All @@ -708,7 +664,7 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
ZIO.environmentWithZIO[R] { r =>
val result = provideEnvironment(r).runAll(s1)
result._2 match {
case Left(cause) => ZIO.failCause(cause.toCause)
case Left(error) => ZIO.fail(error)
case Right(result) => ZIO.succeed(result)
}
}
Expand All @@ -720,17 +676,11 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
ZIO.environmentWithZIO[R] { r =>
val (log, result) = provideEnvironment(r).runAll(s1)
result match {
case Left(cause) => ZIO.failCause(cause.toCause)
case Left(error) => ZIO.fail(error)
case Right((s2, a)) => ZIO.succeed((log, s2, a))
}
}

/**
* Submerges the full cause of failures of this computation.
*/
def unsandbox[E1](implicit ev: E <:< Cause[E1]): ZPure[W, S1, S2, R, E1, A] =
foldM(e => ZPure.failCause(ev(e)), a => ZPure.succeed(a))

/**
* Combines this computation with the specified computation, passing the
* updated state from this computation to that computation and combining the
Expand Down Expand Up @@ -771,33 +721,6 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
)(f: (A, B) => C): ZPure[W1, S1, S3, R1, E1, C] =
self.flatMap(a => that.map(b => f(a, b)))

final def zipWithPar[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](
that: ZPure[W1, S3, S3, R1, E1, B]
)(f: (A, B) => C): ZPure[W1, S3, S3, R1, E1, C] =
self.foldCauseM(
c1 =>
that.foldCauseM(
c2 => ZPure.failCause(c1 && c2),
_ => ZPure.failCause(c1)
),
a => that.map(b => f(a, b))
)

final def zipPar[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](that: ZPure[W1, S3, S3, R1, E1, B])(implicit
zippable: Zippable[A, B]
): ZPure[W1, S3, S3, R1, E1, zippable.Out] =
self.zipWithPar(that)(zippable.zip(_, _))

final def zipParLeft[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](
that: ZPure[W1, S3, S3, R1, E1, B]
): ZPure[W1, S3, S3, R1, E1, A] =
self.zipWithPar(that)((a, _) => a)

final def zipParRight[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](
that: ZPure[W1, S3, S3, R1, E1, B]
): ZPure[W1, S3, S3, R1, E1, B] =
self.zipWithPar(that)((_, b) => b)

/**
* Returns a successful computation if the value is `Right`, or fails with error `None`.
*/
Expand Down Expand Up @@ -915,10 +838,7 @@ object ZPure {
new EnvironmentWithPurePartiallyApplied

def fail[E](e: E): ZPure[Nothing, Any, Nothing, Any, E, Nothing] =
failCause(Cause(e))

def failCause[E](cause: Cause[E]): ZPure[Nothing, Any, Nothing, Any, E, Nothing] =
ZPure.Fail(cause)
ZPure.Fail(e)

/**
* Constructs a computation from an `Either`.
Expand Down Expand Up @@ -1183,15 +1103,15 @@ object ZPure {
}

private final case class Succeed[+A](value: A) extends ZPure[Nothing, Any, Nothing, Any, Nothing, A]
private final case class Fail[+E](error: Cause[E]) extends ZPure[Nothing, Any, Nothing, Any, E, Nothing]
private final case class Fail[+E](error: E) extends ZPure[Nothing, Any, Nothing, Any, E, Nothing]
private final case class Modify[-S1, +S2, +A](run0: S1 => (A, S2)) extends ZPure[Nothing, S1, S2, Any, Nothing, A]
private final case class FlatMap[+W, -S1, S2, +S3, -R, +E, A, +B](
value: ZPure[W, S1, S2, R, E, A],
continue: A => ZPure[W, S2, S3, R, E, B]
) extends ZPure[W, S1, S3, R, E, B]
private final case class Fold[+W, -S1, S2, +S3, -R, E1, +E2, A, +B](
value: ZPure[W, S1, S2, R, E1, A],
failure: Cause[E1] => ZPure[W, S1, S3, R, E2, B],
failure: E1 => ZPure[W, S1, S3, R, E2, B],
success: A => ZPure[W, S2, S3, R, E2, B]
) extends ZPure[W, S1, S3, R, E2, B]
with Function[A, ZPure[W, S2, S3, R, E2, B]] {
Expand Down Expand Up @@ -1222,7 +1142,7 @@ object ZPure {
def apply[W, S1, S2, R, E, A](
state: S1,
zPure: ZPure[W, S1, S2, R, E, A]
): (Chunk[W], Either[Cause[E], (S2, A)]) = {
): (Chunk[W], Either[E, (S2, A)]) = {
val (runner, running) = pool.get()

if (running.compareAndSet(false, true)) {
Expand All @@ -1237,7 +1157,7 @@ object ZPure {
}
}

final private case class Err(cause: Cause[Any]) extends Exception {
final private case class Err(cause: Any) extends Exception {
override def fillInStackTrace(): Throwable = this
}
}
Expand All @@ -1261,12 +1181,12 @@ object ZPure {
private def run[W, S1, S2, R, E, A](
state: S1,
zPure: ZPure[W, S1, S2, R, E, A]
): (Chunk[W], Either[Cause[E], (S2, A)]) = {
): (Chunk[W], Either[E, (S2, A)]) = {
val result =
try
Right(loop(state, zPure.asInstanceOf[Erased]))
catch {
case Runner.Err(c) => Left(c.asInstanceOf[Cause[E]])
case Runner.Err(c) => Left(c.asInstanceOf[E])
}

(_logs.result().asInstanceOf[Chunk[W]], result)
Expand Down Expand Up @@ -1326,9 +1246,9 @@ object ZPure {

ZPure.Fold(
zPure.value,
(cause: Cause[Any]) => {
(error: Any) => {
_logs = previousLogs
ZPure.set(state) *> zPure.failure(cause)
ZPure.set(state) *> zPure.failure(error)
},
(a: Any) => {
val logs0 = _logs.result()
Expand All @@ -1340,7 +1260,7 @@ object ZPure {
} else {
ZPure.Fold(
zPure.value,
ZPure.set(state) *> zPure.failure(_: Cause[Any]),
ZPure.set(state) *> zPure.failure(_: Any),
zPure.success
)
}
Expand All @@ -1359,8 +1279,8 @@ object ZPure {
val zPure = provide0.asInstanceOf[Provide[Any, Any, Any, Any, Any, Any]]
val previousEnv = _environment
_environment = zPure.r
curZPure = zPure.continue.foldCauseM(
e => { _environment = previousEnv; ZPure.failCause(e) },
curZPure = zPure.continue.foldM(
e => { _environment = previousEnv; ZPure.fail(e) },
a => { _environment = previousEnv; ZPure.succeed(a) }
)

Expand Down
40 changes: 0 additions & 40 deletions docs/zpure/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,46 +153,6 @@ There are many other error handling operators defined on `ZPure`. The `catchSome

The `fold` and `foldM` operators allow us to handle both the failure and success cases at the same time. And the `orElse` operator allows us to specify a fallback computation that will be run if the original computation fails.

One other concept from `ZIO` that carries over to `ZPure` is the ability to accumulate multiple errors.

Normally when we use operators like `zip` or `flatMap` if an error occurs we will not go on to evaluate further parts of the computation until we get to an error handler that can potentially recover from it. This is typically what we want because if we have already failed there is no point in doing more work.

However, `ZPure` also allows us to accumulate errors when we use the `zipWithPar` operator . This operator does not do actual parallelism, but is "parallel" in the sense that it will run both computations even if the first one fails and return any failures that occurred.

You can use this to obtain behavior similar to the `Validation` data type in ZIO Prelude. All of the errors will be captured in a `Cause` data structure similar to the one from `ZIO`.

For example, we could model validating some data using `ZPure` like this.

```scala mdoc
case class Person(name: String, age: Int)

def validateName(name: String): ZPure[Nothing, Unit, Unit, Any, String, String] =
if (name.isEmpty) ZPure.fail("name was empty")
else ZPure.succeed(name)

def validateAge(age: Int): ZPure[Nothing, Unit, Unit, Any, String, Int] =
if (age < 0) ZPure.fail(s"Age $age was less than zero")
else ZPure.succeed(age)

def validatePerson(name: String, age: Int): ZPure[Nothing, Unit, Unit, Any, String, Person] =
validateName(name).zipWithPar(validateAge(age))(Person)
```

To expose the full cause of failure we can use the `sandbox` operator.

```scala mdoc
import zio.prelude.fx.Cause

def validatePersonCause(name: String, age: Int): ZPure[Nothing, Unit, Unit, Any, Cause[String], Person] =
validatePerson(name, age).sandbox
```

We can now see all the failures that occurred and handle them using our normal error handling operators. If we want to submerge the full cause again and just see the error type we can undo this with the `unsandbox` operator.

Once again, a variety of other operators for dealing with the full cause of failure are available on `ZPure` analogous to the ones on `ZIO` but we will not cover them all here. With `sandbox` and `unsandbox` you should be able to handle any problems involving working with the full cause of failure and you can always look up more specialized operators later.

Of course, if all we want to do is validate data the `Validation` type is more specialized than this and is what we should use. But it is very nice to be able to accumulate errors when you need to when we are already working in the context of a `ZPure` computation.

## Working With Context

The environment type `R` is also analogous to the environment type of ZIO.
Expand Down