Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
…scala into develop
  • Loading branch information
debasishg committed Mar 14, 2018
2 parents 604555e + 05229e1 commit 2bb39d1
Showing 1 changed file with 6 additions and 36 deletions.
42 changes: 6 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The library wraps Java APIs in Scala thereby providing:
1. much better type inference in Scala
2. less boilerplate in application code
3. the usual builder-style composition that developers get with the original Java API
4. complete compile time type safety

The design of the library was inspired by the work started by Alexis Seigneurin in [this repository](https://github.com/aseigneurin/kafka-streams-scala).

Expand Down Expand Up @@ -52,48 +53,17 @@ val clicksPerRegion: KTableS[String, Long] = userClicksStream
.map((_, regionWithClicks) => regionWithClicks)

// Compute the total per region by summing the individual click counts per region.
.groupByKey(Serialized.`with`(stringSerde, longSerde))
.groupByKey
.reduce(_ + _)
```

> **Notes:**
>
> 1. The left quotes around "with" are there because `with` is a Scala keyword. This is the mechanism you use to "escape" a Scala keyword when it's used as a normal identifier in a Java library.
> 2. Note that some methods, like `map`, take a two-argument function, for key-value pairs, rather than the more typical single argument.
## Better Abstraction

The wrapped Scala APIs also incur less boilerplate by taking advantage of Scala function literals that get converted to Java objects in the implementation of the API. Hence the surface syntax of the client API looks simpler and less noisy.

Here's an example of a snippet built using the Java API from Scala ..

```scala
val approximateWordCounts: KStream[String, Long] = textLines
.flatMapValues(value => value.toLowerCase.split("\\W+").toIterable.asJava)
.transform(
new TransformerSupplier[Array[Byte], String, KeyValue[String, Long]] {
override def get() = new ProbabilisticCounter
},
cmsStoreName)
approximateWordCounts.to(outputTopic, Produced.`with`(Serdes.String(), longSerde))
```

And here's the corresponding snippet using the Scala library. Note how the noise of `TransformerSupplier` has been abstracted out by the function literal syntax of Scala.

```scala
textLines
.flatMapValues(value => value.toLowerCase.split("\\W+").toIterable)
.transform(() => new ProbabilisticCounter, cmsStoreName)
.to(outputTopic, Produced.`with`(Serdes.String(), longSerde))
```

Also, the explicit conversion `asJava` from a Scala `Iterable` to a Java `Iterable` is done for you by the Scala library.

## Implicit Serdes

One of the areas where the Java APIs' verbosity can be reduced is through a succinct way to pass serializers and de-serializers to the various functions. The library implementation offers type safe implicit serdes to provide the serializers and de-serializers. In doing so, the Scala library **does not use configuration based default serdes** which is not type safe and prone to runtime errors.
One of the areas where the Java APIs' verbosity can be reduced is through a succinct way to pass serializers and de-serializers to the various functions. The library uses the power of Scala implicits towards this end. The library makes some decisions that help implement more succinct serdes in a type safe manner:

1. No use of configuration based default serdes. Java APIs allow the user to define default key and value serdes as part of the configuration. This configuration, being implemented as `java.util.Properties` is type-unsafe and hence can result in runtime errors in case the user misses any of the serdes to be specified or plugs in an incorrect serde. `kafka-streams-scala` makes this completely type-safe by allowing all serdes to be specified through Scala implicits.
2. The libraty offers implicit conversions from serdes to `Serialized`, `Produced`, `Consumed` or `Joined`. Hence as a user you just have to pass in the implicit serde and all conversions to `Serialized`, `Produced`, `Consumed` or `Joined` will be taken care of automatically.

The implementation allows implicits for the `Serde`s or for `Serialized`, `Consumed`, `Produced` and `Joined`. The test examples demonstrate both, though the implicits for Serdes make a cleaner implementation.

### Default Serdes

Expand Down

0 comments on commit 2bb39d1

Please sign in to comment.