Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
debasishg committed Jan 15, 2018
2 parents f8bdc6b + 689d661 commit 684b7cf
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 95 deletions.
16 changes: 16 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
language: scala
sudo: false
jdk: oraclejdk8
scala:
- 2.11.11
- 2.12.4
sbt_args: -mem 1500
script:
- sbt "++ ${TRAVIS_SCALA_VERSION}!" test
cache:
directories:
- "$HOME/.ivy2/cache"
- "$HOME/.sbt/launchers"
before_cache:
- find $HOME/.sbt -name "*.lock" | xargs rm
- find $HOME/.ivy2 -name "ivydata-*.properties" | xargs rm
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# A Thin Scala Wrapper Around the Kafka Streams Java API

[![Build Status](https://secure.travis-ci.org/lightbend/kafka-streams-scala.png)](http://travis-ci.org/lightbend/kafka-streams-scala)

The library wraps Java APIs in Scala thereby providing:

1. much better type inference in Scala
Expand All @@ -13,15 +15,15 @@ The design of the library was inspired by the work started by Alexis Seigneurin
`kafka-streams-scala` is published and cross-built for Scala `2.11`, and `2.12`, so you can just add the following to your build:

```scala
val kafka_streams_scala_version = "0.1.0"
val kafka_streams_scala_version = "0.1.1"

libraryDependencies ++= Seq("com.lightbend" %%
"kafka-streams-scala" % kafka_streams_scala_version)
```

> Note: `kafka-streams-scala` supports Kafka Streams `1.0.0`.
> Note: `kafka-streams-scala` supports onwards Kafka Streams `1.0.0`.
The API docs for `kafka-streams-scala` is available [here](https://developer.lightbend.com/docs/api/kafka-streams-scala/0.1.0/com/lightbend/kafka/scala/streams) for Scala 2.12 and [here](https://developer.lightbend.com/docs/api/kafka-streams-scala_2.11/0.1.0/#package) for Scala 2.11.
The API docs for `kafka-streams-scala` is available [here](https://developer.lightbend.com/docs/api/kafka-streams-scala/0.1.1/com/lightbend/kafka/scala/streams) for Scala 2.12 and [here](https://developer.lightbend.com/docs/api/kafka-streams-scala_2.11/0.1.1/#package) for Scala 2.11.

## Running the Tests

Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ name := "kafka-streams-scala"

organization := "com.lightbend"

version := "0.1.0"
version := "0.1.1"

scalaVersion := Versions.Scala_2_12_Version

crossScalaVersions := Versions.CrossScalaVersions

scalacOptions := Seq("-Xexperimental", "-unchecked", "-deprecation")
scalacOptions := Seq("-Xexperimental", "-unchecked", "-deprecation", "-Ywarn-unused-import")

parallelExecution in Test := false

Expand Down
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.1.0
22 changes: 13 additions & 9 deletions src/main/scala/com/lightbend/kafka/scala/streams/KStreamS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import scala.collection.JavaConverters._

/**
* Wraps the Java class KStream and delegates method calls to the underlying Java object.
*/
*/
class KStreamS[K, V](val inner: KStream[K, V]) {

def filter(predicate: (K, V) => Boolean): KStreamS[K, V] = {
Expand Down Expand Up @@ -47,7 +47,7 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
inner.flatMapValues[VR]((v) => processor(v).asJava)
}

def print(printed: Printed[K, V]) = inner.print(printed)
def print(printed: Printed[K, V]): Unit = inner.print(printed)

def foreach(action: (K, V) => Unit): Unit = {
inner.foreach((k, v) => action(k, v))
Expand All @@ -74,15 +74,19 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
val transformerS: Transformer[K, V, (K1, V1)] = transformerSupplier()
new Transformer[K, V, KeyValue[K1, V1]] {
override def transform(key: K, value: V): KeyValue[K1, V1] = {
val (k1,v1) = transformerS.transform(key, value)
KeyValue.pair(k1, v1)
transformerS.transform(key, value) match {
case (k1,v1) => KeyValue.pair(k1, v1)
case _ => null
}
}

override def init(context: ProcessorContext): Unit = transformerS.init(context)

override def punctuate(timestamp: Long): KeyValue[K1, V1] = {
val (k1,v1) = transformerS.punctuate(timestamp)
KeyValue.pair[K1, V1](k1, v1)
transformerS.punctuate(timestamp) match {
case (k1, v1) => KeyValue.pair[K1, V1](k1, v1)
case _ => null
}
}

override def close(): Unit = transformerS.close()
Expand All @@ -99,7 +103,7 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
}

def process(processorSupplier: () => Processor[K, V],
stateStoreNames: String*) = {
stateStoreNames: String*): Unit = {

val processorSupplierJ: ProcessorSupplier[K, V] = () => processorSupplier()
inner.process(processorSupplierJ, stateStoreNames: _*)
Expand Down Expand Up @@ -206,13 +210,13 @@ class KStreamS[K, V](val inner: KStream[K, V]) {

def merge(stream: KStreamS[K, V]): KStreamS[K, V] = inner.merge(stream)

def peek(action: (K, V) => Unit): KStream[K, V] = {
def peek(action: (K, V) => Unit): KStreamS[K, V] = {
inner.peek(action(_,_))
}

// -- EXTENSIONS TO KAFKA STREAMS --

// applies the predicate to know what messages shuold go to the left stream (predicate == true)
// applies the predicate to know what messages should go to the left stream (predicate == true)
// or to the right stream (predicate == false)
def split(predicate: (K, V) => Boolean): (KStreamS[K, V], KStreamS[K, V]) = {
(this.filter(predicate), this.filterNot(predicate))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,67 +1,74 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/

package com.lightbend.kafka.scala.streams

import java.util.regex.Pattern

import ImplicitConversions._
import org.apache.kafka.streams.kstream.{ GlobalKTable, Materialized }
import com.lightbend.kafka.scala.streams.ImplicitConversions._
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.kstream.{GlobalKTable, Materialized}
import org.apache.kafka.streams.processor.{ProcessorSupplier, StateStore}
import org.apache.kafka.streams.state.{ StoreBuilder, KeyValueStore }
import org.apache.kafka.streams.state.{KeyValueStore, StoreBuilder}
import org.apache.kafka.streams.{Consumed, StreamsBuilder, Topology}
import org.apache.kafka.common.utils.Bytes

import scala.collection.JavaConverters._

/**
* Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
*/
class StreamsBuilderS {
* Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
*/
class StreamsBuilderS(inner: StreamsBuilder = new StreamsBuilder) {

val inner = new StreamsBuilder
def stream[K, V](topic: String): KStreamS[K, V] =
inner.stream[K, V](topic)

def stream[K, V](topic: String) : KStreamS[K, V] =
inner.stream[K, V](topic)
def stream[K, V](topic: String, consumed: Consumed[K, V]): KStreamS[K, V] =
inner.stream[K, V](topic, consumed)

def stream[K, V](topic: String, consumed: Consumed[K, V]) : KStreamS[K, V] =
inner.stream[K, V](topic, consumed)

def stream[K, V](topics: List[String]): KStreamS[K, V] =
inner.stream[K, V](topics.asJava)
def stream[K, V](topics: List[String]): KStreamS[K, V] =
inner.stream[K, V](topics.asJava)

def stream[K, V](topics: List[String], consumed: Consumed[K, V]): KStreamS[K, V] =
inner.stream[K, V](topics.asJava, consumed)
inner.stream[K, V](topics.asJava, consumed)

def stream[K, V](topicPattern: Pattern) : KStreamS[K, V] =
def stream[K, V](topicPattern: Pattern): KStreamS[K, V] =
inner.stream[K, V](topicPattern)

def stream[K, V](topicPattern: Pattern, consumed: Consumed[K, V]) : KStreamS[K, V] =
def stream[K, V](topicPattern: Pattern, consumed: Consumed[K, V]): KStreamS[K, V] =
inner.stream[K, V](topicPattern, consumed)

def table[K, V](topic: String) : KTableS[K, V] = inner.table[K, V](topic)
def table[K, V](topic: String): KTableS[K, V] = inner.table[K, V](topic)

def table[K, V](topic: String, consumed: Consumed[K, V]) : KTableS[K, V] =
def table[K, V](topic: String, consumed: Consumed[K, V]): KTableS[K, V] =
inner.table[K, V](topic, consumed)

def table[K, V](topic: String, consumed: Consumed[K, V],
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
inner.table[K, V](topic, consumed, materialized)
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
inner.table[K, V](topic, consumed, materialized)

def table[K, V](topic: String,
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
def table[K, V](topic: String,
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
inner.table[K, V](topic, materialized)

def globalTable[K, V](topic: String): GlobalKTable[K, V] =
inner.globalTable(topic)

def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilder = inner.addStateStore(builder)
def globalTable[K, V](topic: String, consumed: Consumed[K, V]): GlobalKTable[K, V] =
inner.globalTable(topic, consumed)

def addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore], topic: String, sourceName: String, consumed: Consumed[_, _], processorName: String, stateUpdateSupplier: ProcessorSupplier[_, _]): StreamsBuilder =
inner.addGlobalStore(storeBuilder,topic,sourceName,consumed,processorName,stateUpdateSupplier)
def globalTable[K, V](topic: String, consumed: Consumed[K, V],
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): GlobalKTable[K, V] =
inner.globalTable(topic, consumed, materialized)

def globalTable[K, V](topic: String,
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): GlobalKTable[K, V] =
inner.globalTable(topic, materialized)

def build() : Topology = inner.build()
}
def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilder = inner.addStateStore(builder)

def addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore], topic: String, sourceName: String, consumed: Consumed[_, _], processorName: String, stateUpdateSupplier: ProcessorSupplier[_, _]): StreamsBuilder =
inner.addGlobalStore(storeBuilder, topic, sourceName, consumed, processorName, stateUpdateSupplier)

def build(): Topology = inner.build()
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@ package com.lightbend.kafka.scala.server
// https://github.com/lagom/lagom/blob/master/dev/kafka-server/src/main/scala/com/lightbend/lagom/internal/kafka/KafkaLocalServer.scala

import java.io.{ IOException, File }
import java.nio.file.{ FileVisitOption, Files, Paths }
import java.util.Properties

import org.apache.curator.test.TestingServer
import com.typesafe.scalalogging.LazyLogging

import kafka.server.{KafkaConfig, KafkaServerStartable}

import scala.collection.JavaConverters._
import scala.util.{ Try, Success, Failure }
import java.util.Comparator

import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.utils.ZkUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.io.File
import java.nio.file.{ FileVisitOption, Files, Paths }
import java.util.Comparator

import scala.util.{ Try, Success, Failure }
import scala.util.Try
import scala.collection.JavaConverters._

object Utils {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/

package com.lightbend.kafka.scala.streams

import minitest.TestSuite
import com.lightbend.kafka.scala.server.{ KafkaLocalServer, MessageSender, MessageListener, RecordProcessorTrait }

import java.util.{ Properties, Locale }
import java.util.Properties
import java.util.regex.Pattern

import org.apache.kafka.streams.{ KeyValue, StreamsConfig, KafkaStreams, Consumed }
import org.apache.kafka.streams.kstream.{ Materialized, Produced, KeyValueMapper, Printed }
import org.apache.kafka.common.serialization.{ Serdes, StringSerializer, StringDeserializer, Serde, LongDeserializer }
import com.lightbend.kafka.scala.server.{KafkaLocalServer, MessageListener, MessageSender, RecordProcessorTrait}
import minitest.TestSuite
import org.apache.kafka.clients.consumer.ConsumerRecord

import scala.concurrent.duration._

import ImplicitConversions._
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}

object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestData {

Expand Down Expand Up @@ -45,19 +40,19 @@ object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestDa
val streamsConfiguration = new Properties()
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, s"wordcount-${scala.util.Random.nextInt(100)}")
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcountgroup")

streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, localStateDir)

val builder = new StreamsBuilderS
val builder = new StreamsBuilderS()

val textLines = builder.stream[String, String](inputTopic)

val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)

val wordCounts: KTableS[String, Long] =
val wordCounts: KTableS[String, Long] =
textLines.flatMapValues(v => pattern.split(v.toLowerCase))
.groupBy((k, v) => v)
.count()
Expand All @@ -70,15 +65,15 @@ object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestDa
//
// Step 2: Produce some input data to the input topic.
//
val sender = MessageSender[String, String](brokers, classOf[StringSerializer].getName, classOf[StringSerializer].getName)
val sender = MessageSender[String, String](brokers, classOf[StringSerializer].getName, classOf[StringSerializer].getName)
val mvals = sender.batchWriteValue(inputTopic, inputValues)

//
// Step 3: Verify the application's output data.
//
val listener = MessageListener(brokers, outputTopic, "wordcountgroup",
classOf[StringDeserializer].getName,
classOf[LongDeserializer].getName,
val listener = MessageListener(brokers, outputTopic, "wordcountgroup",
classOf[StringDeserializer].getName,
classOf[LongDeserializer].getName,
new RecordProcessor
)

Expand All @@ -90,10 +85,11 @@ object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestDa
}

class RecordProcessor extends RecordProcessorTrait[String, Long] {
override def processRecord(record: ConsumerRecord[String, Long]): Unit = {
override def processRecord(record: ConsumerRecord[String, Long]): Unit = {
// println(s"Get Message $record")
}
}

}

trait WordCountTestData {
Expand Down
Loading

0 comments on commit 684b7cf

Please sign in to comment.