forked from politrons/reactiveScala
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathZIOFiber.scala
45 lines (37 loc) · 1.28 KB
/
ZIOFiber.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package app.impl.zio
import org.apache.commons.lang.exception.ExceptionUtils
import zio.{Queue, Semaphore, Task, ZIO}
object ZIOFiber extends App {
val main: zio.Runtime[zio.ZEnv] = zio.Runtime.default
val queue: Queue[Task[Unit]] = main.unsafeRun {
for {
semaphore <- Semaphore.make(permits = 2)
queue <- Queue.bounded[Task[Unit]](1)
_ <- queue.take.flatMap(program => {
(for {
spots <-semaphore.available
_ <- ZIO.effect(println(s"let's run this program! we have $spots spots"))
_ <- semaphore.withPermit(program)
} yield ()).catchAll(t =>{
println(s"Error in program in Thread ${Thread.currentThread().getName}. Caused by ${ExceptionUtils.getStackTrace(t)}")
ZIO.succeed(())
}).fork
}).forever.forkDaemon
} yield queue
}
val program = Task.effect{
Thread.sleep(1000)
throw new NullPointerException()
}
main.unsafeRun(queue.offer(program))
Thread.sleep(100)
main.unsafeRun(queue.offer(program))
Thread.sleep(100)
main.unsafeRun(queue.offer(program))
main.unsafeRun(queue.offer(program))
main.unsafeRun(queue.offer(program))
main.unsafeRun(queue.offer(program))
main.unsafeRun(queue.offer(program))
main.unsafeRun(queue.offer(program))
Thread.sleep(2000)
}