forked from politrons/reactiveScala
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTransforming.scala
154 lines (133 loc) · 4.96 KB
/
Transforming.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package app.impl.rx
import java.util.concurrent.Executors
import app.impl.Generic
import org.junit.Test
import rx.lang.scala.Observable
import rx.lang.scala.schedulers.ExecutionContextScheduler
import scala.concurrent.ExecutionContext
/**
* Transforming operators allow us to transform our pipeline and evolve the values passed through the pipeline.
* Remember that everything that happen inside a pipeline is immutable, so every time that you "modify" an item
* inside a function of Map or flatMap, actually you´re creating an emitting a new item.
*/
class Transforming extends Generic[String, Long] {
@Test def operators(): Unit = {
Observable.from(Observable.getClass.getMethods)
.filter(m => Observable.getClass.isAssignableFrom(m.getReturnType))
.map(m => m.getName)
.distinct
.foreach(m => println(m))
}
/**
* Map operator allow us to evolve the pipeline passing new items through the pipeline.
* In order to do that use a Function which receive an item and return another item.
* Remember that everything here is inmutable, so every new item emitted through the pipeline is ALWAYS new
*/
@Test def map(): Unit = {
addHeader("Map observable")
val text = "number mutate to String:"
val list = getList
Observable.from(list)
.map(n => text.concat(String.valueOf(n)))
.map(s => s.toUpperCase())
.subscribe(s => println(s))
}
/**
* In case that you want a new pipeline in your current pipeline you can use flatMap,
* where you can create another Observable which will emit their items, and once it finish,
* the emitted items will be passed to the previous pipeline.
*
*/
@Test def flatMap(): Unit = {
addHeader("Flat Map")
val text = "number mutated in flatMap::"
val list = getList
Observable.from(list)
.flatMap(n => Observable.just(n) //--> New pipeline
.map(n => text.concat(String.valueOf(n))))
.map(s => s.toUpperCase())
.subscribe(s => println(s))
}
/**
* This operator allow us to return a new item in case the one emitted was null/empty
*/
@Test def orElse(): Unit = {
addHeader("or Else")
Observable.empty
.orElse("hello scala world")
.map(n => n.toUpperCase())
.subscribe(n => println(n))
}
/**
* Buffer operator create a ArrayBuffer which append all items emitted in the pipeline
* Once that we have the arrayBuffer we can use the operators to remove some items inside it
*/
@Test def buffer(): Unit = {
addHeader("buffer operator")
Observable.from(getList)
.toBuffer
.doOnNext(a => println(a.-(1)))
.doOnNext(a => println(a.-(2)))
.doOnNext(a => println(a.-(3)))
.doOnNext(a => println(a.-(4)))
.subscribe(n => println(n))
}
/**
* Compose operator emit the observable to the Transformer function and return a new Observable.
*/
@Test def composeToString(): Unit = {
addHeader("compose operator")
Observable.just(1)
// .compose(transformerToString)
.subscribe(n => println(n))
}
/**
* Compose operator emit the observable to the Transformer function and return a new Observable
* that will run in another thread.
*/
@Test def composeToAsync(): Unit = {
addHeader("compose async operator")
val executor = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
val scheduler = ExecutionContextScheduler(executor)
println("Thread before pipeline:" + Thread.currentThread().getName)
Observable.just(1)
// .compose(transformerToAsync(scheduler))
.doOnNext(i => println("Thread inside pipeline:" + Thread.currentThread().getName))
.subscribe(n => println(n))
}
/**
* Using the scan operator we can interact with the items emitted in the pipeline.
* We have to pass to the operator the default first value, and then as second argument we pass
* the previous item emitted, and the new one.
*/
@Test def scanInRevertMap(): Unit = {
val map = Map[String, Int]("1" -> 1, "1" -> 2, "3" -> 1)
Observable.from(map.toIterable)
.map(entry => Map[Int, String](entry._2 -> entry._1))
.scan(Map[Int, String]())((m, m1) => m ++ m1)
.last
.subscribe(m => println(m))
}
/**
* This example compare two lists and return a new list with only the values that exist in both collections
*/
@Test def listEqualThanList(): Unit = {
val listA = List(1, 3, 5)
Observable.from(List(1, 4, 6, 5, 3))
.flatMap(n => Observable.from(listA)
.filter(n1 => n1 == n)
.toList)
.scan(List[Int]())((l, l1) => l ++ l1)
.last
.subscribe(list => println(list))
}
def transformerToAsync(scheduler: ExecutionContextScheduler): (Observable[Int]) => Observable[Observable[Int]] = {
i => Observable.just(i).observeOn(scheduler)
}
def transformerToString: (Observable[Int]) => Observable[String] = {
o => Observable.just("Hello Scala world")
}
def getList: List[Int] = {
List(1, 2, 3, 4, 5)
}
}