- Compare evolution of stream processing techniques
- Java in Scala
- Iterators
- Play Iteratees
- scalaz-stream
- Assert that the
z
is here to help, not confuse.
- scalaz.concurrent.Task (November, 2014)
- scalaz-stream (December, 2014)
- http4s (???)
Fear not, I'll explain Task
again when it comes up.
- Make you an expert in scalaz-stream
- Make you intermediate in scalaz-stream
- Parse a CSV file.
- Discard the header and any malformed lines.
- Geocode the IP from the CSV.
- Filter for US.
- Emit 20 records.
- Run in constant space.
!scala
val in = new BufferedReader(new FileReader(inF))
try {
val out = new PrintWriter(new FileWriter(outF), true)
try {
var line: String = in.readLine() // Skip the header
var i = 20
while ({ line = in.readLine(); line != null && !line.contains('"') && i > 0 }) {
val mockData = MockData.parse(line)
val country = Await.result(Geocoder(mockData.ipAddress), 5.seconds)
if (country == Country("United States")) {
out.println(mockData.email)
i = i - 1
}
}
} finally out.close()
} finally in.close()
- Manual, verbose resource management of input and output
null
. Boo, hiss.- Poorly factored
- Termination condition entwined with mutable state.
- Filtering pushes surviving lines into deeper nesting.
- Repeats
readLine()
call to discard header.
- Blocks threads
- On geocoder
- On input
- On output
- Arbitrary timeout on geocoder.
- Not parallel
- Flow of code hard to trace back to requirements.
!scala
using(new FileInputStream("mockaroo.csv")) { in =>
using(new PrintWriter(new FileOutputStream("emails.txt"), true)) { out =>
Source.fromInputStream(in).getLines
.drop(1)
.filterNot(_.contains('"'))
.map(MockData.parse)
.map(datum => datum -> Await.result(Geocoder(datum.ipAddress), 5.seconds))
.collect { case (datum, Country(c)) if c == "United States" => datum.email }
.take(20)
.foreach(out.println)
}
}
- Manual,
verboseresource management of input and output null
. Boo, hiss.Poorly factoredTermination condition entwined with mutable state.Filtering pushes surviving lines into deeper nesting.RepeatsreadLine()
call to discard header.
- Blocks threads
- On geocoder
- On input
- On output
- Arbitrary timeout on geocoder.
- Not parallel
Flow of code hard to trace back to requirements.
- Scalaz has an alternate, mostly forgotten implementation.
- Should be familiar to you Play folks and old faces here.
- In ten words or less: an iterator pulls, an iteratee is pushed onto.
- Let's port our code to play-iteratee.
!scala
using(new PrintWriter(new FileWriter(outF), true)) { out =>
val f = Enumerator.fromFile(inF)
.through(decode())
.map(new String(_))
.through(Enumeratee.grouped(upToNewLine))
.through(filter(_.nonEmpty))
.through(drop(1))
.through(filterNot[String](_.contains('"')))
.through(map(MockData.parse))
.through(mapM(datum => Geocoder(datum.ipAddress).map(datum -> _)))
.through(collect { case (datum, Country(c)) if c == "United States" => datum.email})
.through(take(20))
.run(Iteratee.foreach(out.println))
Await.result(f, 1.minute)
}
- Manual,
verboseresource management ofinputand output - Blocks threads
On geocoder- On input
- On output
Arbitrary timeout on geocoder.- Not parallel
- Needs an "extras" package from a personal GitHub just to decode a binary stream to text in constant space.
- My attempts to parallelize led to buffer overflows and stack overflows
- Possible mitigating factor: I'm not very smart.
!scala
file.linesR(inF.getName)
.drop(1)
.filter(!_.contains('"'))
.map(MockData.parse)
.gatherMap(8)(datum => Geocoder.task(datum.ipAddress).map(datum -> _))
.collect { case (datum, Country(c)) if c == "United States" => datum.email }
.take(20)
.to(file.chunkW(outF.getName).contramap(s => ByteVector(s.getBytes)))
.run
.run
Manual, verbose resource management of input and outputBlocks threadsOn geocoderOn inputOn output
Arbitrary timeout on geocoder.Not parallelNeeds an "extras" package from a personal GitHub just to decode a binary stream to text in constant space.My attempts to parallelize led to buffer overflows and stack overflows- scalaz-stream isn't in Maven Central
- You're betting your project on 0.6 of something.
- But I didn't change a line of code from 0.5 to 0.6.
- Task, rather than Future, is the first class citizen.
- In time, you will view this as a selling point.
!scala
Task.async { f =>
future.onComplete {
case Success(a) => f(right(a))
case Failure(t) => f(left(t))
}
}
- A "Scalaz stream" is a
scalaz.stream.Process[F[_], O]
. - The
F[_]
is the context in which an action runs.- It can be anything with one type parameter.
- But it has to be a Monad to run.
- It has to be
Catchable
to run.
- It can be anything with one type parameter.
O
is the type returned by the action.- A
Process
is sequence ofF
actions returningO
.
- A source of
O
is typically aProcess[Task, O]
.O
s are requested on demand.
- Lots of ways to create them:
Process.apply
,Process.emitAll
, etc. to source simple collections.scalaz.stream.io
andscalaz.stream.nio
for I/O, blocking and not.scalaz.stream.tcp
for remoting.scalaz.stream.async
can generate a source from a queue, bounded or not.Process.eval
makes a one-elementProcess
from aTask
.Process.repeatEval
makes a source from repeatedly invoking aTask
.
- This is like an
Enumerator[O]
.
- Alias for
Process[Env[I,Any]#Is, O]
- That
Is
is type system voodoo to ensure that anI
is requested. - It translates inputs to zero-to-many outputs.
- That
Process[F, A].pipe(Process[A, B]) =>
Process[F, B]`filter
,drop
,take
are all simple examples.- This is like an
Enumeratee[I, O]
.- The common ones come with suffix notation to preserve the feel of a collection.
- Alias for
Channel[Task, I, Unit]
- Alias for
Process[Task, I => Task[F[Unit]]
- It's a source of functions to be run on each input
- Alias for
Process[F, A].to(Sink[Task, A]) => Process[Task, Unit]
scalaz.stream.io
andscalaz.stream.nio
for I/O, blocking and not.- This is like an
Iteratee[O, Unit]
- But more controlled, because Tasks don't run until you tell them.
def run: F[Unit]
: Run for a side effect.def runLast: F[Option[A]]
: Discard all values but the last.def runLastOr(a: A): F[A]
: In case the process emits nothing
def runLog: F[Vector[A]]
: Get all the intermediate resultsdef runFoldMap(f: A => B): F[B]
: Map the outputs toB
, and add them up.B
must have a monoid instance.- All the others are built on this.
- If
F
is a Task, nothing happens until yourun
the result of yourrun
.- Banish the side effects to the outermost edge of your program.
- Easy to compose
Process
results into larger execution flows.
- Jawn is a fast JSON parser, supporting a variety of JSON AST:
- Argonaut
- json4s
- play-json
- rojoma
- spray-json
- It has an asynchronous mode, to return results from partial input
- I wrote a simple scalaz-stream adapter
- https://github.com/rossabaker/jawn-streamz
- Is backbone of http4s' JSON support
- Coming eventually: serialization.
!scala
Process.awakeEvery(1.second) // every 1 second
.map(_ => 64) // ask for 64 bytes
.through(file.chunkR("mockaroo.json")) // from mockaroo.json
.unwrapJsonArray[JValue] // emit one JSON array element at a time
.collect { case jObj: JObject => jObj.get("email") } // get the e-mail
.map(_.toString) // convert to string
.to(scalaz.stream.io.stdOutLines) // write to stdout
.run // convert to a task
.run // and execute said task
- We process hundreds of millions of messages a day through scalaz-stream
- Great fit for consuming from and producting to Kafka topics.
- Manages our connections, which is more complicated than try-finally-close.
- Very robust in bursty load
- Open source wrappers on the way.
- @rossabaker
- Co-organizer, IndyScala
- Principal Cloud Engineer, CrowdStrike
- We're hiring