Skip to content

Commit

Permalink
remove build with deduplication topology for now
Browse files Browse the repository at this point in the history
  • Loading branch information
fonsdant committed Sep 17, 2024
1 parent eb46262 commit bafaa10
Showing 1 changed file with 0 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,60 +324,6 @@ public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing(final String
);
}

@SuppressWarnings("deprecation")
private KafkaStreams buildWithDeduplicationTopology(final String stateDirPath, final String eosConfig) {
final StreamsBuilder builder = new StreamsBuilder();

builder.addStateStore(Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(storeName),
Serdes.Integer(),
Serdes.Integer())
);
builder.<Integer, Integer>stream(inputTopic)
.process(
() -> new org.apache.kafka.streams.kstream.Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
private KeyValueStore<Integer, Integer> store;

@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
store = (KeyValueStore<Integer, Integer>) context.getStateStore(storeName);
}

@Override
public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
if (skipRecord.get()) {
// we only forward so we can verify the skipping by reading the output topic
// the goal is skipping is to not modify the state store
return KeyValue.pair(key, value);
}

if (store.get(key) != null) {
return null;
}

store.put(key, value);
store.flush();

if (key == KEY_1) {
// after error injection, we need to avoid a consecutive error after rebalancing
skipRecord.set(true);
throw new RuntimeException("Injected test error");
}

return KeyValue.pair(key, value);
}

@Override
public void close() { }
},
storeName
)
.to(outputTopic);

return new KafkaStreams(builder.build(), props(stateDirPath, eosConfig));
}


private Properties props(final String stateDirPath, final String eosConfig) {
final Properties streamsConfiguration = new Properties();
Expand Down

0 comments on commit bafaa10

Please sign in to comment.