-
Notifications
You must be signed in to change notification settings - Fork 0
/
aqueduct.html
executable file
·142 lines (135 loc) · 6.39 KB
/
aqueduct.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
<h1 id="zeromqakka-streams-activator-template">ZeroMQ/Akka streams activator template</h1>
<h2 id="dependencies">Dependencies</h2>
<p>Dependencies for using rx_zmq_streams in your code:</p>
<pre><code>resolvers += "zenaptix at bintray" at "http://dl.bintray.com/zenaptix/rx_zmq_streams"
libraryDependencies += "com.zenaptix" %% "rx_zmq_streams" % "0.3.1"</code></pre>
<h2 id="overview">Overview</h2>
<p>This application is a somewhat heuristic implementation of Reactive Streams utilizing zeroMQ as transport layer<br />
It creates a ReactiveZeroMQ Akka Extension that provides a:</p>
<ol style="list-style-type: decimal">
<li>pubSink that will transmit its data from an Akka Stream onto zeroMQ</li>
<li>subSource that will receive data over zeroMQ and stream it out to a subsequent Akka Stream</li>
<li>the pubSink and subSource are implementing the Reactive Streams protocol and as such allows for a back-pressure on the zeroMQ transmission</li>
</ol>
<p>It uses zeroMQ Pub/Sub socket as the transport for a Reactive Streams implementation with Akka Streams.<br />
Akka Streams is an implementation of <a href="http://www.reactive-streams.org/">Reactive Streams</a><br />
zeroMQ is a Socket Library disguised as a concurrency framework. "Sockets on steroids"<br />
The project depends on either jeromq or jzmq</p>
<pre><code>"org.zeromq" % "jzmq" % "3.1.0"
"org.zeromq" % "jeromq" % "0.3.5" </code></pre>
<h2 id="pubsink">pubSink</h2>
<p>A pubSink is acquired from the RxZMQExtension and is used as a sink at the end of a Akka Stream. All the messages will be forwarded over zeroMQ to a subscriber (subSource)</p>
<h2 id="subsource">subSource</h2>
<p>A subSource is acquired from the RxZMQExtension and is used as a source at the start of a Akka Stream. All themessages received from the zeroMQ socket is pushed onto the Akka Stream.</p>
<h2 id="example">Example</h2>
<p>Below is a trivial example running a connected processes.</p>
<p>Connecting over zeroMQ</p>
<pre><code>import com.zenaptix.reactive.RxZMQExtension
import akka.actor.{Props, ActorSystem}
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.Timeout
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.LazyLogging
import zeromq.Message
import scala.concurrent.duration._
class RxZMQExtensionExample {
implicit val timeout: Timeout = Timeout(5 seconds)
implicit val materializer = ActorFlowMaterializer()
val conf = ConfigFactory.load()
def rxZmq = RxZMQExtension(system)
val sourcePath = conf.getString("gateway.source_path")
val file = new File(sourcePath)
// publish the messages from the file
SynchronousFileSource(file,13).map(b => {
println(s"[SOURCE] -> ${b.decodeString("UTF8")}")
Message(b)
}).runWith(rxZmq.pubSink())
// receive the messages from the file
var i = 0
val subSource = rxZmq.subSource("0.0.0.0:5556")
subSource.map(
m => {
i += 1
println(s"[SINK]] <- ${m.part.utf8String}")
}).runWith(Sink.ignore)
}</code></pre>
<h2 id="testing-using-rxzmqextensionspec">Testing using RxZMQExtensionSpec</h2>
<p>To run the above code - cd to the directory where the project was cloned and execute:</p>
<pre><code> export RX_ZMQ_INSTALL_DIR=`pwd`
sbt test
</pre></code></pre>
<h2 id="testing-using-com.zenaptix.reactivepublisher-and-reactivesubscriber">Testing using com.zenaptix.ReactivePublisher and ReactiveSubscriber</h2>
<p>The Reactive Publisher</p>
<pre><code>package com.zenaptix.reactive
import java.io.File
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.io.SynchronousFileSource
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
object ReactivePublisher extends App with LazyLogging {
implicit val system = ActorSystem("publisher")
implicit val materializer = ActorFlowMaterializer()
lazy val log = system.log
def rxZmq = RxZMQExtension(system)
val conf = ConfigFactory.load()
val sourcePath = conf.getString("gateway.source_path")
val file = new File(sourcePath)
SynchronousFileSource(file,13).map(b => {
logger.debug(s"[SOURCE] -> ${b.decodeString("UTF8")}")
Message(b)
}).runWith(rxZmq.pubSink())
system.awaitTermination()
}</code></pre>
<p>Running</p>
<pre><code>sbt
> project publisher
> run</code></pre>
<p>The Reactive Subscriber</p>
<pre><code>package com.zenaptix.reactive
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.Sink
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
object ReactiveSubscriber extends App with LazyLogging {
implicit val system = ActorSystem("subscriber")
implicit val materializer = ActorFlowMaterializer()
def rxZmq = RxZMQExtension(system)
val conf = ConfigFactory.load()
val conn = s"${conf.getString("zeromq.host")}:${conf.getInt("zeromq.port")}")
rxZmq.subSource(conn).map(m =>
logger.debug(s"SINK <- ${m.part.decodeString("UTF8")}")).
runWith(Sink.ignore)
}</code></pre>
<p>Running:</p>
<pre><code>sbt
> project subscriber
> run
</pre></code></pre>
<h2 id="using-docker">Using Docker</h2>
<p>The Publisher</p>
<p>On OSX first install and run boot2docker:</p>
<pre><code>export RX_ZMQ_INSTALL_DIR=`pwd`
sbt
> project publisher
> docker:publishLocal
> exit
docker run -v $RX_ZMQ_INSTALL_DIR/data:/data -p 5556:5556 -p 5557:5557 \
-d --name rx_zmq_publisher com.zenaptix/rx_zmq_publisher:v0.3.1</code></pre>
<p>The Subscriber</p>
<p>and again:</p>
<pre><code>sbt
project subscriber
> docker:publishLocal
> exit
docker run -i -t --name rx_zmq_subscriber com.zenaptix:rx_zmq_subscriber:v0.3.1 -Dzeromq.host=$RXZMQ_PUBLISHER_IP</code></pre>
<p>The Logs then view the publisher log:</p>
<pre><code>docker logs rx_zmq_publisher</code></pre>
<h2 id="message-exchange">Message Exchange</h2>
<p>rx_zmq_streams is implemented using a Req/Rep channel for exchanging control messages, and a pub/sub channel for receiving the actual data.</p>
<div class="figure">
<img src="tutorial/rxzmq.png" alt="Image" />
<p class="caption">Image</p>
</div>