-
Notifications
You must be signed in to change notification settings - Fork 0
/
Kafka02Consumer.scala
47 lines (38 loc) · 2.05 KB
/
Kafka02Consumer.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package ca.mcit.bigdata.kafka
import java.time.Duration
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerConfig._
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.serialization.StringDeserializer
import scala.collection.JavaConverters._
object Kafka02Consumer extends App {
val topicName = "trips"
val consumerProperties = new Properties()
consumerProperties.setProperty(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
consumerProperties.setProperty(GROUP_ID_CONFIG, "group-id-1")
consumerProperties.setProperty(AUTO_OFFSET_RESET_CONFIG, "earliest")
consumerProperties.setProperty(KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
consumerProperties.setProperty(VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
consumerProperties.setProperty(ENABLE_AUTO_COMMIT_CONFIG, "false")
val consumer = new KafkaConsumer[String, String](consumerProperties)
consumer.subscribe(List(topicName).asJava)
while (true) {
val polledRecords: ConsumerRecords[String, String] =
consumer.poll(Duration.ofSeconds(1))
var enrichedList: List[EnrichedTrip] = List()
if (!polledRecords.isEmpty){
println(s"Polled ${polledRecords.count()} messages from $topicName")
val recordIterator = polledRecords.iterator()
while (recordIterator.hasNext) {
val record: ConsumerRecord[String, String] = recordIterator.next()
val dataRecord = record.value().split(",")
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~enrichedTrip
enrichedList = enrichedList ++
List(EnrichedTrip(TripsRoute(Trips(dataRecord(0), dataRecord(1), dataRecord(2), dataRecord(3),
if (dataRecord(6) == "1") true else false), Routes(null,null,null)), CalendarDates(null,null, 0)))
}
Kafka03ProduceEnrichedTopic.ProduceEnrichedTopic(enrichedList)
}
for (x <- enrichedList) {println(x)}
}
}