Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
debasishg committed Apr 4, 2018
2 parents 64d0688 + 8e3b56b commit 8f132c7
Show file tree
Hide file tree
Showing 15 changed files with 148 additions and 6 deletions.
3 changes: 3 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Kafka Streams Scala
Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
Copyright 2017-2018 Alexis Seigneurin.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ The design of the library was inspired by the work started by Alexis Seigneurin
`kafka-streams-scala` is published and cross-built for Scala `2.11`, and `2.12`, so you can just add the following to your build:

```scala
val kafka_streams_scala_version = "0.2.0"
val kafka_streams_scala_version = "0.2.1"

libraryDependencies ++= Seq("com.lightbend" %%
"kafka-streams-scala" % kafka_streams_scala_version)
```

> Note: `kafka-streams-scala` supports onwards Kafka Streams `1.0.0`.
The API docs for `kafka-streams-scala` is available [here](https://developer.lightbend.com/docs/api/kafka-streams-scala/0.2.0/com/lightbend/kafka/scala/streams) for Scala 2.12 and [here](https://developer.lightbend.com/docs/api/kafka-streams-scala_2.11/0.2.0/#package) for Scala 2.11.
The API docs for `kafka-streams-scala` is available [here](https://developer.lightbend.com/docs/api/kafka-streams-scala/0.2.1/com/lightbend/kafka/scala/streams) for Scala 2.12 and [here](https://developer.lightbend.com/docs/api/kafka-streams-scala_2.11/0.2.1/#package) for Scala 2.11.

## Running the Tests

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ name := "kafka-streams-scala"

organization := "com.lightbend"

version := "0.2.0"
version := "0.2.1"

scalaVersion := Versions.Scala_2_12_Version

Expand Down
2 changes: 1 addition & 1 deletion project/Versions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ object Versions {
val CuratorVersion = "4.0.0"
val MinitestVersion = "2.0.0"
val JDKVersion = "1.8"
val Scala_2_12_Version = "2.12.4"
val Scala_2_12_Version = "2.12.5"
val Scala_2_11_Version = "2.11.11"
val Avro4sVersion = "1.8.3"
val CrossScalaVersions = Seq(Scala_2_12_Version, Scala_2_11_Version )
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
* Copyright 2017-2018 Alexis Seigneurin.
*/

package com.lightbend.kafka.scala.streams
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
* Copyright 2017-2018 Alexis Seigneurin.
*/

package com.lightbend.kafka.scala.streams
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
* Copyright 2017-2018 Alexis Seigneurin.
*/

package com.lightbend.kafka.scala.streams
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
* Copyright 2017-2018 Alexis Seigneurin.
*/

package com.lightbend.kafka.scala.streams
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
* Copyright 2017-2018 Alexis Seigneurin.
*/

package com.lightbend.kafka.scala.streams
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
* Copyright 2017-2018 Alexis Seigneurin.
*/

package com.lightbend.kafka.scala.streams
Expand Down Expand Up @@ -137,7 +138,7 @@ class KStreamS[K, V](val inner: KStream[K, V]) {

def join[VT, VR](table: KTableS[K, VT],
joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStreamS[K, VR] =
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)
inner.join[VT, VR](table.inner, joiner.asValueJoiner, joined)

def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
keyValueMapper: (K, V) => GK,
Expand Down Expand Up @@ -165,7 +166,7 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStreamS[K, VR] =
inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)

def merge(stream: KStreamS[K, V]): KStreamS[K, V] = inner.merge(stream)
def merge(stream: KStreamS[K, V]): KStreamS[K, V] = inner.merge(stream.inner)

def peek(action: (K, V) => Unit): KStreamS[K, V] = {
inner.peek(action(_,_))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
* Copyright 2017-2018 Alexis Seigneurin.
*/

package com.lightbend.kafka.scala.streams
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
* Copyright 2017-2018 Alexis Seigneurin.
*/

package com.lightbend.kafka.scala.streams
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
* Copyright 2017-2018 Alexis Seigneurin.
*/

package com.lightbend.kafka.scala.streams
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
* Copyright 2017-2018 Alexis Seigneurin.
*/

package com.lightbend.kafka.scala.streams
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/

package com.lightbend.kafka.scala.streams

import java.util.Properties
import java.util.regex.Pattern

import com.lightbend.kafka.scala.server.{KafkaLocalServer, MessageListener, MessageSender, RecordProcessorTrait}
import minitest.TestSuite
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import ImplicitConversions._
import com.typesafe.scalalogging.LazyLogging

object KafkaStreamsMergeTest extends TestSuite[KafkaLocalServer] with WordCountMergeTestData with LazyLogging {

override def setup(): KafkaLocalServer = {
val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir))
s.start()
s
}

override def tearDown(server: KafkaLocalServer): Unit = {
server.stop()
}

test("should count words") { server =>

server.createTopic(inputTopic1)
server.createTopic(inputTopic2)
server.createTopic(outputTopic)

//
// Step 1: Configure and start the processor topology.
//
import DefaultSerdes._

val streamsConfiguration = new Properties()
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, s"wordcount-${scala.util.Random.nextInt(100)}")
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcountgroup")

streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, localStateDir)

val builder = new StreamsBuilderS()

val textLines1 = builder.stream[String, String](inputTopic1)
val textLines2 = builder.stream[String, String](inputTopic2)

val textLines = textLines1.merge(textLines2)

val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)

val wordCounts: KTableS[String, Long] =
textLines.flatMapValues(v => pattern.split(v.toLowerCase))
.groupBy((k, v) => v)
.count()

wordCounts.toStream.to(outputTopic)

val streams = new KafkaStreams(builder.build(), streamsConfiguration)
streams.start()

//
// Step 2: Produce some input data to the input topics.
//
val sender = MessageSender[String, String](brokers, classOf[StringSerializer].getName, classOf[StringSerializer].getName)
val mvals1 = sender.batchWriteValue(inputTopic1, inputValues)
val mvals2 = sender.batchWriteValue(inputTopic2, inputValues)

//
// Step 3: Verify the application's output data.
//
val listener = MessageListener(brokers, outputTopic, "wordcountgroup",
classOf[StringDeserializer].getName,
classOf[LongDeserializer].getName,
new RecordProcessor
)

val l = listener.waitUntilMinKeyValueRecordsReceived(expectedWordCounts.size, 30000)

assertEquals(l.sortBy(_.key), expectedWordCounts.sortBy(_.key))

streams.close()
}

class RecordProcessor extends RecordProcessorTrait[String, Long] {
override def processRecord(record: ConsumerRecord[String, Long]): Unit = {
// logger.info(s"Get Message $record")
}
}

}

trait WordCountMergeTestData {
val inputTopic1 = s"inputTopic1.${scala.util.Random.nextInt(100)}"
val inputTopic2 = s"inputTopic2.${scala.util.Random.nextInt(100)}"
val outputTopic = s"outputTpic.${scala.util.Random.nextInt(100)}"
val brokers = "localhost:9092"
val localStateDir = "local_state_data"

val inputValues = List(
"Hello Kafka Streams",
"All streams lead to Kafka",
"Join Kafka Summit",
"И теперь пошли русские слова"
)

val expectedWordCounts: List[KeyValue[String, Long]] = List(
new KeyValue("hello", 2L),
new KeyValue("all", 2L),
new KeyValue("streams", 4L),
new KeyValue("lead", 2L),
new KeyValue("to", 2L),
new KeyValue("join", 2L),
new KeyValue("kafka", 6L),
new KeyValue("summit", 2L),
new KeyValue("и", 2L),
new KeyValue("теперь", 2L),
new KeyValue("пошли", 2L),
new KeyValue("русские", 2L),
new KeyValue("слова", 2L)
)
}


0 comments on commit 8f132c7

Please sign in to comment.