forked from politrons/reactiveScala
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTaskConsumers.scala
161 lines (146 loc) · 5.2 KB
/
TaskConsumers.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package com.politrons.monix
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream}
import java.util.concurrent.TimeUnit
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import org.junit.Test
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import scala.concurrent.duration._
/**
* Task is another Monad solution as Observable, IO or the same Task from Scalaz.
* The idea is simple. Separate consumer from producer. Here we can create some Producers called as [Task]
* and that functions only it will be executed and consumed when someone use the specific operator
*/
class TaskConsumers {
/**
* As we describe before Task is a Producer which has not start producing anything. He just describe what
* will produce and what he will do with the data emitted in the pipeline.
* Is only when one of the consumers start consuming using in this case the operator [runOnComplete] when we
* start the execution of the Task and we receive the data.
*
* runOnComplete operator generate a cancelable instance, which is running in another thread, so in order to
* wait for the complete resolution of this new thread we use a Sleep.
*/
@Test
def runOnCompleteOperator(): Unit = {
Task("Hello Monix Task world!")
.map(value => value.toUpperCase)
.runOnComplete {
case Success(value) =>
println(value)
value
case Failure(ex) =>
System.out.println(s"ERROR: ${ex.getMessage}")
ex
}
Thread.sleep(1000)
}
/**
* A cancelable object it can be cancel at some point in time, if you want to cancel that process and clean the
* resource you just need to use cancel operator.
*/
@Test
def cancelOnCompleteOperator(): Unit = {
val cancelable = Task("Hello Monix Task world!")
.map(value => value.toUpperCase)
.delayResult(FiniteDuration.apply(1, TimeUnit.SECONDS))
.runOnComplete {
case Success(value) =>
println(value)
value
case Failure(ex) =>
System.out.println(s"ERROR: ${ex.getMessage}")
ex
}
cancelable.cancel()
Thread.sleep(1000)
}
/**
* In case we want to execute the logic of the task in another thread, and later to join the result of the execution
* of the task in the main thread we use the operator [runAsync] which it will return a cancelableFuture which is
* an extension of Scala future.
*/
@Test
def runAsyncOperator(): Unit = {
val cancelableFuture = Task("Hello Monix Task world!")
.map(value => value.toUpperCase)
.runAsync
val result = Await.result(cancelableFuture, 10 seconds)
println(result)
}
/**
* The runSyncMaybe is one of the most amazing operator I've ever seen. He calculate in runtime if a process
* is already done and we don't need to wait for him, or otherwise create a CancelableFuture to be run async.
* It return an Either[CancelableFuture[T], T]
*/
@Test
def runSyncMaybeOperator(): Unit = {
val either = Task(getSentence(0.5))
.map(value => value.toUpperCase)
.runSyncMaybe
either match {
case Right(value) => println(value)
case Left(future) => println(Await.result(future, 10 seconds))
}
}
/**
* The coeval is one of the most amazing operator I've ever seen. He calculate in runtime if a process
* is already done and we dont need to wait for him, or otherwise create a CancelableFuture to be run async.
* It return an Either[Throwable, Either[CancelableFuture[T], T]]
*/
@Test
def coevalOperator(): Unit = {
val coeval = Task(getSentenceWithMaybeError(0.5))
.map(value => value.toUpperCase)
.coeval
coeval.run match {
case Right(either) =>
either match {
case Right(value) => println(s"Right value $value")
case Left(future) => println(s"We need to run this in a future ${Await.result(future, 10 second)}")
}
case Left(throwable) => println(s"An error just happen $throwable")
}
}
def getSentenceWithMaybeError(perc: Double): String = {
if (math.random < perc) {
throw new NullPointerException()
} else {
if (math.random < perc) {
Thread.sleep(1000)
"This test should be executed async"
} else {
"Plain sync process"
}
}
}
def getSentence(perc: Double): String = {
if (math.random < perc) {
Thread.sleep(1000)
"This test should be executed async"
} else {
"Plain sync process"
}
}
import java.io.ObjectOutputStream
@Test
def serializeTask(): Unit = {
val eventualString = Future {
Thread.sleep(1000)
"A future is serializable as a task"
}
val task = Task.fromFuture(eventualString)
val bos = new ByteArrayOutputStream()
val out = new ObjectOutputStream(bos)
out.writeObject(task)
val yourBytes = bos.toByteArray
val bis = new ByteArrayInputStream(yourBytes)
val in = new ObjectInputStream(bis)
val o = in.readObject()
val taskLater = o.asInstanceOf[Task[String]]
val cancelableFuture = taskLater.runAsync
val result = Await.result(cancelableFuture, 10 seconds)
println(s"Result:$result")
}
}