-
Notifications
You must be signed in to change notification settings - Fork 4
Teknek stream processing on Kafka data
In hello teknek we set up a simple plan that read data from a Feed that produced fixed data. While that is great for a demo that is not very useful. In this example we create a plan that reads from Apache Kafka using a out-of-the-box kafka as a Feed support.
Teknek has multiple ways to load Feed and Operator classes. In this example we load up the teknek-kafka feed from a URL. We previously uploaded this jar using the web-ui of teknek.
create plan k
configure feed k
set class as io.teknek.kafka.SimpleKafkaFeed
set feedspec as url
set script as http://localhost:8080//teknek-web/Serve/teknek-kafka-0.0.1-SNAPSHOT-jar-with-dependencies.jar
set property simple.kafka.feed.consumer.group as 'group1'
set property simple.kafka.feed.reset.offset as 'yes'
set property simple.kafka.feed.partitions as 1
set property simple.kafka.feed.topic as 'clickstream'
set property simple.kafka.feed.zookeeper.connect as 'localhost:2181'
exit
import https://raw.github.com/edwardcapriolo/teknek/master/teknek-core/src/test/resources/bundle_io.teknek_itests1.0.0.json
load io.teknek groovy_identity operator as groovy_identity
exit
set root groovy_identity
set maxworkers 1
set tupleRetry 1
set offsetCommitInterval 1
save
We can use kafka's tools to send data to the topic
[edward@jackintosh kafka_2.8.0-0.8.0]$ bin/kafka-console-producer.sh --toickstream --broker-list localhost:9092
soemthing
We have teknek in the foreground in debug. You can see the the message being processed.
{message=[B@5c26a3bc, key=null}
DEBUG 15:27:22,372 No children operators for this operator. Tuple not being passed on {message=[B@59a7e6cd, key=null}
{message=[B@59a7e6cd, key=null}
The message from kafka is a byte [] and those do not print well. We can create another operator and print the message out so we can see it.
Lets add an operator after the root one to print messages so we can read them back.
configure operator better_print
set operatorsped as groovyclosure
set script as { tuple, collector -> collector.emit(tuple) ; println(new String(tuple.getField("message"))) }
set class as better_print
exit
plan> for groovy_identity add child better_print
We can see how the new operator better_print
is now a child of groovy_identifier
. This means that a tuple emit-ed by the parent will be forwarded to the child.
plan> show formatted plan
"rootOperator" : {
"spec" : "groovyclosure",
"script" : "{ tuple, collector -> collector.emit(tuple) ; println(tuple) }",
"theClass" : "groovy_identity",
"name" : "groovy_identity",
"parameters" : { },
"children" : [ {
"spec" : "groovyclosure",
"script" : "{ tuple, collector -> collector.emit(tuple) ; println(new String(tuple.getField(\"message\"))) } ",
"theClass" : "better_print",
"name" : "better_print",
"parameters" : { },
"children" : [ ]
} ]
},
Now we can send messages to the topic
[edward@jackintosh kafka_2.8.0-0.8.0]$ bin/kafka-console-producer.sh --topic cckstream --broker-list localhost:9092
now writethis
come on
The tuples are outputted twice as they pass between the operators
DEBUG 16:29:48,753 No children operators for this operator. Tuple not being passed on {message=[B@27174693, key=null}
{message=[B@27174693, key=null}
{message=[B@79906abc, key=null}
now write this
come on
Cool! Now see how Feed (drivers) can be loaded and configured at development time. We also see how operators can be chained together to achieve desired effects.