-
Notifications
You must be signed in to change notification settings - Fork 138
/
Copy pathBackend.java
93 lines (78 loc) · 3.63 KB
/
Backend.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package brave.example;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.healthcheck.HealthCheckService;
import com.linecorp.armeria.server.logging.LoggingService;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
public final class Backend {
public static void main(String[] args) {
TracingConfiguration tracing = TracingConfiguration.create("backend", false);
Properties streamsConfig = new Properties();
String kafkaBootstrapServers = System.getProperty("kafka.bootstrap-servers", "localhost:19092");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "processor-service");
streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"earliest");
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input", Consumed.with(Serdes.String(), Serdes.String()))
.processValues(tracing.kafkaStreamsTracing.processValues("mapping",
() -> r -> r.withValue(r.value() + ". Thanks!")));
KafkaStreams kafkaStreams =
tracing.kafkaStreamsTracing.kafkaStreams(builder.build(), streamsConfig);
kafkaStreams.start();
Properties consumerConfigs = new Properties();
consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-service");
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StringDeserializer keyDeserializer = new StringDeserializer();
StringDeserializer valueDeserializer = new StringDeserializer();
Consumer<String, String> consumer = tracing.kafkaTracing
.consumer(new KafkaConsumer<>(consumerConfigs, keyDeserializer, valueDeserializer));
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(new ConsumerLoop(consumer, Collections.singleton("output")));
Server server = Server.builder()
.http(9000)
.service("/health", HealthCheckService.of())
.decorator(LoggingService.newDecorator())
.build();
server.start().join();
}
static final class ConsumerLoop implements Runnable {
final Consumer<String, String> consumer;
final Collection<String> topics;
ConsumerLoop(Consumer<String, String> consumer, Collection<String> topics) {
this.consumer = consumer;
this.topics = topics;
}
@Override public void run() {
try {
consumer.subscribe(topics);
while (!Thread.interrupted()) {
for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMinutes(1))) {
System.out.println("Record: " + record.key() + " " + record.value());
}
consumer.commitAsync();
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
}
}
}