Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16339: [2/4 KStream#flatTransform] Remove Deprecated "transformer" methods and classes #17245

Draft
wants to merge 13 commits into
base: trunk
Choose a base branch
from
Draft
640 changes: 27 additions & 613 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
* @param <R> {@link KeyValue} return type (both key and value type can be set
* arbitrarily)
* @see TransformerSupplier
* @see KStream#transform(TransformerSupplier, String...)
* @see ValueTransformer
* @see KStream#map(KeyValueMapper)
* @see KStream#flatMap(KeyValueMapper)
Expand Down Expand Up @@ -71,9 +70,6 @@ public interface Transformer<K, V, R> {

/**
* Transform the record with the given key and value.
* Additionally, any {@link StateStore state} that is {@link KStream#transform(TransformerSupplier, String...)
* attached} to this operator can be accessed and modified
* arbitrarily (cf. {@link ProcessorContext#getStateStore(String)}).
* <p>
* If only one record should be forward downstream, {@code transform} can return a new {@link KeyValue}. If
* more than one output record should be forwarded downstream, {@link ProcessorContext#forward(Object, Object)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
* @param <R> {@link org.apache.kafka.streams.KeyValue KeyValue} return type (both key and value type can be set
* arbitrarily)
* @see Transformer
* @see KStream#transform(TransformerSupplier, String...)
* @see ValueTransformer
* @see ValueTransformerSupplier
* @see KStream#transformValues(ValueTransformerSupplier, String...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
* @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see Transformer
* @see TransformerSupplier
* @see KStream#transform(TransformerSupplier, String...)
* @deprecated Since 4.0. Use {@link FixedKeyProcessorSupplier} instead.
*/
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
* @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see Transformer
* @see TransformerSupplier
* @see KStream#transform(TransformerSupplier, String...)
*/
public interface ValueTransformerWithKeySupplier<K, V, VR> extends ConnectedStoreProvider, Supplier<ValueTransformerWithKey<K, V, VR>> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1243,66 +1243,6 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
builder);
}

@Override
@Deprecated
public <KR, VR> KStream<KR, VR> transform(final org.apache.kafka.streams.kstream.TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
final String name = builder.newProcessorName(TRANSFORM_NAME);
return flatTransform(new TransformerSupplierAdapter<>(transformerSupplier), Named.as(name), stateStoreNames);
}

@Override
@Deprecated
public <KR, VR> KStream<KR, VR> transform(final org.apache.kafka.streams.kstream.TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
return flatTransform(new TransformerSupplierAdapter<>(transformerSupplier), named, stateStoreNames);
}

@Override
@Deprecated
public <K1, V1> KStream<K1, V1> flatTransform(final org.apache.kafka.streams.kstream.TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
final String name = builder.newProcessorName(TRANSFORM_NAME);
return flatTransform(transformerSupplier, Named.as(name), stateStoreNames);
}

@Override
@Deprecated
public <K1, V1> KStream<K1, V1> flatTransform(final org.apache.kafka.streams.kstream.TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
Objects.requireNonNull(named, "named can't be null");
Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
ApiUtils.checkSupplier(transformerSupplier);
for (final String stateStoreName : stateStoreNames) {
Objects.requireNonNull(stateStoreName, "stateStoreNames can't contain `null` as store name");
}

final String name = new NamedInternal(named).name();
final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
name,
new ProcessorParameters<>(new KStreamFlatTransform<>(transformerSupplier), name),
stateStoreNames);
transformNode.keyChangingOperation(true);

builder.addGraphNode(graphNode, transformNode);

// cannot inherit key and value serde
return new KStreamImpl<>(
name,
null,
null,
subTopologySourceNodes,
true,
transformNode,
builder);
}

@Override
@Deprecated
public <VR> KStream<K, VR> transformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,10 @@
* @see Topology#addProcessor(String, org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
* @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
* @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, Named, String...)
* @see KStream#transform(org.apache.kafka.streams.kstream.TransformerSupplier, String...)
* @see KStream#transform(org.apache.kafka.streams.kstream.TransformerSupplier, Named, String...)
* @see KStream#transformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, String...)
* @see KStream#transformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, Named, String...)
* @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see KStream#transformValues(ValueTransformerWithKeySupplier, Named, String...)
* @see KStream#flatTransform(org.apache.kafka.streams.kstream.TransformerSupplier, String...)
* @see KStream#flatTransform(org.apache.kafka.streams.kstream.TransformerSupplier, Named, String...)
* @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, String...)
* @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, Named, String...)
* @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, String...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,15 +867,6 @@ public void shouldUseSpecifiedNameForForEachOperation() {
assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
}

@Test
@SuppressWarnings("deprecation")
public void shouldUseSpecifiedNameForTransform() {
builder.stream(STREAM_TOPIC).transform(() -> null, Named.as(STREAM_OPERATION_NAME));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
}

@Test
@SuppressWarnings("deprecation")
public void shouldUseSpecifiedNameForTransformValues() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,72 +87,6 @@ private void verifyResult(final List<KeyValue<Integer, Integer>> expected) {
assertThat(results, equalTo(expected));
}

private class TestTransformer implements org.apache.kafka.streams.kstream.Transformer<Integer, Integer, KeyValue<Integer, Integer>> {
private KeyValueStore<Integer, Integer> state;

@Override
public void init(final ProcessorContext context) {
state = context.getStateStore(stateStoreName);
}

@Override
public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
state.putIfAbsent(key, 0);
Integer storedValue = state.get(key);
final KeyValue<Integer, Integer> result = new KeyValue<>(key + 1, value + storedValue++);
state.put(key, storedValue);
return result;
}

@Override
public void close() {
}
}

@Test
public void shouldTransform() {
builder.addStateStore(storeBuilder());

stream
.transform(TestTransformer::new, stateStoreName)
.foreach(accumulateExpected);

final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
KeyValue.pair(2, 1),
KeyValue.pair(3, 2),
KeyValue.pair(4, 3),
KeyValue.pair(3, 2),
KeyValue.pair(3, 5),
KeyValue.pair(2, 4));
verifyResult(expected);
}

@Test
public void shouldTransformWithConnectedStoreProvider() {
stream
.transform(new org.apache.kafka.streams.kstream.TransformerSupplier<Integer, Integer, KeyValue<Integer, Integer>>() {
@Override
public org.apache.kafka.streams.kstream.Transformer<Integer, Integer, KeyValue<Integer, Integer>> get() {
return new TestTransformer();
}

@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(storeBuilder());
}
})
.foreach(accumulateExpected);

final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
KeyValue.pair(2, 1),
KeyValue.pair(3, 2),
KeyValue.pair(4, 3),
KeyValue.pair(3, 2),
KeyValue.pair(3, 5),
KeyValue.pair(2, 4));
verifyResult(expected);
}

private class TestFlatTransformer implements org.apache.kafka.streams.kstream.Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>> {
private KeyValueStore<Integer, Integer> state;

Expand All @@ -178,73 +112,6 @@ public void close() {
}
}

@Test
public void shouldFlatTransform() {
builder.addStateStore(storeBuilder());

stream
.flatTransform(TestFlatTransformer::new, stateStoreName)
.foreach(accumulateExpected);

final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
KeyValue.pair(1, 1),
KeyValue.pair(2, 2),
KeyValue.pair(3, 3),
KeyValue.pair(2, 2),
KeyValue.pair(3, 3),
KeyValue.pair(4, 4),
KeyValue.pair(3, 3),
KeyValue.pair(4, 4),
KeyValue.pair(5, 5),
KeyValue.pair(2, 4),
KeyValue.pair(3, 5),
KeyValue.pair(4, 6),
KeyValue.pair(2, 9),
KeyValue.pair(3, 10),
KeyValue.pair(4, 11),
KeyValue.pair(1, 6),
KeyValue.pair(2, 7),
KeyValue.pair(3, 8));
verifyResult(expected);
}

@Test
public void shouldFlatTransformWithConnectedStoreProvider() {
stream
.flatTransform(new org.apache.kafka.streams.kstream.TransformerSupplier<Integer, Integer, Iterable<KeyValue<Integer, Integer>>>() {
@Override
public org.apache.kafka.streams.kstream.Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>> get() {
return new TestFlatTransformer();
}

@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(storeBuilder());
}
})
.foreach(accumulateExpected);

final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
KeyValue.pair(1, 1),
KeyValue.pair(2, 2),
KeyValue.pair(3, 3),
KeyValue.pair(2, 2),
KeyValue.pair(3, 3),
KeyValue.pair(4, 4),
KeyValue.pair(3, 3),
KeyValue.pair(4, 4),
KeyValue.pair(5, 5),
KeyValue.pair(2, 4),
KeyValue.pair(3, 5),
KeyValue.pair(4, 6),
KeyValue.pair(2, 9),
KeyValue.pair(3, 10),
KeyValue.pair(4, 11),
KeyValue.pair(1, 6),
KeyValue.pair(2, 7),
KeyValue.pair(3, 8));
verifyResult(expected);
}

private class TestValueTransformerWithKey implements ValueTransformerWithKey<Integer, Integer, Integer> {
private KeyValueStore<Integer, Integer> state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
Expand Down Expand Up @@ -329,26 +331,32 @@ private KafkaStreams buildWithDeduplicationTopology(final String stateDirPath) {
Serdes.Integer())
);
builder.<Integer, Integer>stream(inputTopic)
.transform(
() -> new org.apache.kafka.streams.kstream.Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
.process(
() -> new Processor<Integer, Integer, Integer, Integer>() {
private ProcessorContext<Integer, Integer> context;
private KeyValueStore<Integer, Integer> store;

@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
store = (KeyValueStore<Integer, Integer>) context.getStateStore(storeName);
public void init(final ProcessorContext<Integer, Integer> context) {
this.context = context;
store = context.getStateStore(storeName);
}

@Override
public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
public void process(final Record<Integer, Integer> record) {
final Integer key = record.key();
final Integer value = record.value();

if (skipRecord.get()) {
// we only forward so we can verify the skipping by reading the output topic
// the goal is skipping is to not modify the state store
return KeyValue.pair(key, value);
context.forward(record);
return;
}

if (store.get(key) != null) {
return null;
store.delete(key);
return;
}

store.put(key, value);
Expand All @@ -360,11 +368,9 @@ public KeyValue<Integer, Integer> transform(final Integer key, final Integer val
throw new RuntimeException("Injected test error");
}

return KeyValue.pair(key, value);
store.put(key, value);
context.forward(record);
}

@Override
public void close() { }
},
storeName
)
Expand Down
Loading