forked from politrons/reactiveScala
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCombining.scala
71 lines (62 loc) · 2.24 KB
/
Combining.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package app.impl.rx
import app.impl.Generic
import org.junit.Test
import rx.lang.scala.Observable
/**
* Combining operators are used in order to combine items emitted by the pipeline.
*/
class Combining extends Generic[String, Long] {
/**
* Merge operator get two observables and combine together, returning both items emitted together
*/
@Test def merge(): Unit = {
addHeader("Observable merge")
Observable.just("hello")
.merge(Observable.just(" scala"))
.merge(Observable.just(" world!"))
.reduce((s, s1) => s.concat(s1))
.map(s => s.toUpperCase).subscribe(s => println(s))
}
/**
* Zip operator allow us to chain several observables and once that all of them has been emitted,
* return all them at the same time.
*/
@Test def zip(): Unit = {
addHeader("Observable zip")
Observable.just("hello")
.zip(Observable.just(" scala"))
.zip(Observable.just(" world!"))
.map(s => s._1._1.concat(s._1._2).concat(s._2).toUpperCase)
.subscribe(s => println(s))
}
/**
* This creator operator allow us create the observable with three observables, if you want more than 3
* you can always implement your own function.
*/
@Test def zipWith3(): Unit = {
Observable.zip(Observable.just("hello"), Observable.just(" scala"), Observable.just(" world"))
.map(sentences => sentences._1.toUpperCase.concat(sentences._2.toUpperCase.concat(sentences._3.toUpperCase)))
.subscribe(s => println(s))
}
/**
* This operator concat all Observables items emitted in the pipeline
*/
@Test def concat(): Unit = {
addHeader("Observable concat")
Observable.from(List("hello", " scala", " world"))
.map(n => Observable.just(n))
.concat
.subscribe(n => println(n))
}
/**
* concatMap can be used instead of flatMap just to return a new observable where internally you can, or not concat the items
* This operator is pretty shit and does not has to much value to be honest
*/
@Test def concatMap(): Unit = {
addHeader("Observable concatMap")
Observable.just("hello")
.concatMap(s => Observable.just(s.concat(" scala")))
.concatMap(s => Observable.just(s.concat(" world")))
.subscribe(n => println(n))
}
}