Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16260: Deprecate window.size.ms and window.inner.class.serde in StreamsConfig #18297

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
@@ -253,6 +253,9 @@ <h5><a id="upgrade_400_notable" href="#upgrade_400_notable">Notable changes in 4
been updated to enable the replication of topics that appear to be internal but aren't truly internal to Kafka and Mirror Maker 2.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1074%3A+Allow+the+replication+of+user+internal+topics">KIP-1074</a> for more details.
</li>
<li>
The <code>window.size.ms</code> and <code>window.inner.serde.class</code> in stream config are deprecated.
</li>
</ul>
</li>
</ul>
Original file line number Diff line number Diff line change
@@ -1061,9 +1061,12 @@ private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(final Deserializer<
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, 500L);
if (keyDeserializer instanceof TimeWindowedDeserializer) {
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
} else if (keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
}
return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
@@ -1085,9 +1088,12 @@ private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(final
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, 500L);
if (keyDeserializer instanceof TimeWindowedDeserializer) {
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
} else if (keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
}
return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
@@ -1119,7 +1125,7 @@ private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(final Deserial
"--property", "key.deserializer=" + keyDeserializer.getClass().getName(),
"--property", "value.deserializer=" + valueDeserializer.getClass().getName(),
"--property", "key.separator=" + keySeparator,
"--property", "key.deserializer." + StreamsConfig.WINDOWED_INNER_CLASS_SERDE + "=" + Serdes.serdeFrom(innerClass).getClass().getName(),
"--property", "key.deserializer." + TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS + "=" + Serdes.serdeFrom(innerClass).getClass().getName(),
"--property", "key.deserializer.window.size.ms=500",
};

Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.TestUtils;

@@ -262,7 +263,7 @@ private <K, V> boolean processKeyValueAndVerifyCount(
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, 500L);


final List<KeyValueTimestamp<K, V>> actual =
Original file line number Diff line number Diff line change
@@ -444,9 +444,12 @@ private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(final
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, windowSize);
if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, windowSize);
if (keyDeserializer instanceof TimeWindowedDeserializer) {
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
} else if (keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
}
return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
Original file line number Diff line number Diff line change
@@ -478,9 +478,12 @@ private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(final
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, windowSize);
if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, windowSize);
if (keyDeserializer instanceof TimeWindowedDeserializer) {
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
} else if (keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
}
return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
23 changes: 21 additions & 2 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
@@ -45,6 +45,10 @@
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.SessionWindowedSerializer;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.assignment.TaskAssignor;
@@ -815,13 +819,28 @@ public class StreamsConfig extends AbstractConfig {
+ CONFIG_ERROR_MSG
+ "\"NO_OPTIMIZATION\" by default.";

/** {@code windowed.inner.class.serde} */
/**
* {@code windowed.inner.class.serde}
*
* @deprecated since 4.0.0.
* Use {@link TimeWindowedSerializer#WINDOWED_INNER_SERIALIZER_CLASS} for {@link TimeWindowedSerializer}.
* Use {@link TimeWindowedDeserializer#WINDOWED_INNER_DESERIALIZER_CLASS} for {@link TimeWindowedDeserializer}.
* Use {@link SessionWindowedSerializer#WINDOWED_INNER_SERIALIZER_CLASS} for {@link SessionWindowedSerializer}.
* Use {@link SessionWindowedDeserializer#WINDOWED_INNER_DESERIALIZER_CLASS} for {@link SessionWindowedDeserializer}.
*/
@Deprecated
public static final String WINDOWED_INNER_CLASS_SERDE = "windowed.inner.class.serde";
private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default serializer / deserializer for the inner class of a windowed record. Must implement the " +
"<code>org.apache.kafka.common.serialization.Serde</code> interface. Note that setting this config in KafkaStreams application would result " +
"in an error as it is meant to be used only from Plain consumer client.";

/** {@code window.size.ms} */
/**
* {@code window.size.ms}
*
* @deprecated since 4.0.0.
* Use {@link TimeWindowedDeserializer#WINDOW_SIZE_MS_CONFIG} for {@link TimeWindowedDeserializer}.
*/
@Deprecated
public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
private static final String WINDOW_SIZE_MS_DOC = "Sets window size for the deserializer in order to calculate window end times.";

Original file line number Diff line number Diff line change
@@ -23,10 +23,20 @@
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.internals.SessionKeySchema;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class SessionWindowedDeserializer<T> implements Deserializer<Windowed<T>> {

/**
* Configuration key for the windowed inner deserializer class.
*/
public static final String WINDOWED_INNER_DESERIALIZER_CLASS = "windowed.inner.deserializer.class";

private final Logger log = LoggerFactory.getLogger(SessionWindowedDeserializer.class);

private Deserializer<T> inner;

// Default constructor needed by Kafka
@@ -36,34 +46,43 @@ public SessionWindowedDeserializer(final Deserializer<T> inner) {
this.inner = inner;
}

@SuppressWarnings("unchecked")
@SuppressWarnings({"deprecation", "unchecked"})
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
final String windowedInnerClassSerdeConfig = (String) configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);

Serde<T> windowInnerClassSerde = null;
String deserializerConfigFrom = WINDOWED_INNER_DESERIALIZER_CLASS;
String windowedInnerDeserializerClassConfig = (String) configs.get(WINDOWED_INNER_DESERIALIZER_CLASS);
if (windowedInnerDeserializerClassConfig == null) {
final String windowedInnerClassSerdeConfig = (String) configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
if (windowedInnerClassSerdeConfig != null) {
deserializerConfigFrom = StreamsConfig.WINDOWED_INNER_CLASS_SERDE;
windowedInnerDeserializerClassConfig = windowedInnerClassSerdeConfig;
log.warn("Config {} is deprecated. Please use {} instead.",
StreamsConfig.WINDOWED_INNER_CLASS_SERDE, WINDOWED_INNER_DESERIALIZER_CLASS);
}
}

if (windowedInnerClassSerdeConfig != null) {
Serde<T> windowedInnerDeserializerClass = null;
if (windowedInnerDeserializerClassConfig != null) {
try {
windowInnerClassSerde = Utils.newInstance(windowedInnerClassSerdeConfig, Serde.class);
windowedInnerDeserializerClass = Utils.newInstance(windowedInnerDeserializerClassConfig, Serde.class);
} catch (final ClassNotFoundException e) {
throw new ConfigException(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, windowedInnerClassSerdeConfig,
"Serde class " + windowedInnerClassSerdeConfig + " could not be found.");
throw new ConfigException(deserializerConfigFrom, windowedInnerDeserializerClassConfig,
"Serde class " + windowedInnerDeserializerClassConfig + " could not be found.");
}
}

if (inner != null && windowedInnerClassSerdeConfig != null) {
if (!inner.getClass().getName().equals(windowInnerClassSerde.deserializer().getClass().getName())) {
if (inner != null && windowedInnerDeserializerClassConfig != null) {
if (!inner.getClass().getName().equals(windowedInnerDeserializerClass.deserializer().getClass().getName())) {
throw new IllegalArgumentException("Inner class deserializer set using constructor "
+ "(" + inner.getClass().getName() + ")" +
" is different from the one set in windowed.inner.class.serde config " +
"(" + windowInnerClassSerde.deserializer().getClass().getName() + ").");
" is different from the one set in " + deserializerConfigFrom + " config " +
"(" + windowedInnerDeserializerClass.deserializer().getClass().getName() + ").");
}
} else if (inner == null && windowedInnerClassSerdeConfig == null) {
} else if (inner == null && windowedInnerDeserializerClassConfig == null) {
throw new IllegalArgumentException("Inner class deserializer should be set either via constructor " +
"or via the windowed.inner.class.serde config");
"or via the " + WINDOWED_INNER_DESERIALIZER_CLASS + " config");
} else if (inner == null)
inner = windowInnerClassSerde.deserializer();
inner = windowedInnerDeserializerClass.deserializer();
}

@Override
Original file line number Diff line number Diff line change
@@ -24,10 +24,20 @@
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.state.internals.SessionKeySchema;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class SessionWindowedSerializer<T> implements WindowedSerializer<T> {

/**
* Configuration key for the windowed inner serializer class.
*/
public static final String WINDOWED_INNER_SERIALIZER_CLASS = "windowed.inner.serializer.class";

private final Logger log = LoggerFactory.getLogger(SessionWindowedSerializer.class);

private Serializer<T> inner;

// Default constructor needed by Kafka
@@ -37,32 +47,42 @@ public SessionWindowedSerializer(final Serializer<T> inner) {
this.inner = inner;
}

@SuppressWarnings("unchecked")
@SuppressWarnings({"deprecation", "unchecked"})
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
final String windowedInnerClassSerdeConfig = (String) configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
Serde<T> windowInnerClassSerde = null;
if (windowedInnerClassSerdeConfig != null) {
String serializerConfigFrom = WINDOWED_INNER_SERIALIZER_CLASS;
String windowedInnerSerializerClassConfig = (String) configs.get(WINDOWED_INNER_SERIALIZER_CLASS);
if (windowedInnerSerializerClassConfig == null) {
final String windowedInnerClassSerdeConfig = (String) configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
if (windowedInnerClassSerdeConfig != null) {
serializerConfigFrom = StreamsConfig.WINDOWED_INNER_CLASS_SERDE;
windowedInnerSerializerClassConfig = windowedInnerClassSerdeConfig;
log.warn("Config {} is deprecated. Please use {} instead.",
StreamsConfig.WINDOWED_INNER_CLASS_SERDE, WINDOWED_INNER_SERIALIZER_CLASS);
}
}
Serde<T> windowedInnerSerializerClass = null;
if (windowedInnerSerializerClassConfig != null) {
try {
windowInnerClassSerde = Utils.newInstance(windowedInnerClassSerdeConfig, Serde.class);
windowedInnerSerializerClass = Utils.newInstance(windowedInnerSerializerClassConfig, Serde.class);
} catch (final ClassNotFoundException e) {
throw new ConfigException(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, windowedInnerClassSerdeConfig,
"Serde class " + windowedInnerClassSerdeConfig + " could not be found.");
throw new ConfigException(serializerConfigFrom, windowedInnerSerializerClassConfig,
"Serde class " + windowedInnerSerializerClassConfig + " could not be found.");
}
}

if (inner != null && windowedInnerClassSerdeConfig != null) {
if (!inner.getClass().getName().equals(windowInnerClassSerde.serializer().getClass().getName())) {
if (inner != null && windowedInnerSerializerClassConfig != null) {
if (!inner.getClass().getName().equals(windowedInnerSerializerClass.serializer().getClass().getName())) {
throw new IllegalArgumentException("Inner class serializer set using constructor "
+ "(" + inner.getClass().getName() + ")" +
" is different from the one set in windowed.inner.class.serde config " +
"(" + windowInnerClassSerde.serializer().getClass().getName() + ").");
" is different from the one set in " + serializerConfigFrom + " config " +
"(" + windowedInnerSerializerClass.serializer().getClass().getName() + ").");
}
} else if (inner == null && windowedInnerClassSerdeConfig == null) {
} else if (inner == null && windowedInnerSerializerClassConfig == null) {
throw new IllegalArgumentException("Inner class serializer should be set either via constructor " +
"or via the windowed.inner.class.serde config");
"or via the " + WINDOWED_INNER_SERIALIZER_CLASS + " config");
} else if (inner == null)
inner = windowInnerClassSerde.serializer();
inner = windowedInnerSerializerClass.serializer();
}

@Override
Original file line number Diff line number Diff line change
@@ -23,10 +23,25 @@
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.internals.WindowKeySchema;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>> {

/**
* Configuration key for the window size in milliseconds.
*/
public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";

/**
* Configuration key for the windowed inner deserializer class.
*/
public static final String WINDOWED_INNER_DESERIALIZER_CLASS = "windowed.inner.deserializer.class";

private final Logger log = LoggerFactory.getLogger(TimeWindowedDeserializer.class);

private Long windowSize;
private boolean isChangelogTopic;

@@ -47,50 +62,10 @@ public Long getWindowSize() {
return this.windowSize;
}

@SuppressWarnings("unchecked")
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
//check to see if the window size config is set and the window size is already set from the constructor
final Long configWindowSize;
if (configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG) instanceof String) {
configWindowSize = Long.parseLong((String) configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG));
} else {
configWindowSize = (Long) configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG);
}
if (windowSize != null && configWindowSize != null) {
throw new IllegalArgumentException("Window size should not be set in both the time windowed deserializer constructor and the window.size.ms config");
} else if (windowSize == null && configWindowSize == null) {
throw new IllegalArgumentException("Window size needs to be set either through the time windowed deserializer " +
"constructor or the window.size.ms config but not both");
} else {
windowSize = windowSize == null ? configWindowSize : windowSize;
}

final String windowedInnerClassSerdeConfig = (String) configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);

Serde<T> windowInnerClassSerde = null;

if (windowedInnerClassSerdeConfig != null) {
try {
windowInnerClassSerde = Utils.newInstance(windowedInnerClassSerdeConfig, Serde.class);
} catch (final ClassNotFoundException e) {
throw new ConfigException(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, windowedInnerClassSerdeConfig,
"Serde class " + windowedInnerClassSerdeConfig + " could not be found.");
}
}

if (inner != null && windowedInnerClassSerdeConfig != null) {
if (!inner.getClass().getName().equals(windowInnerClassSerde.deserializer().getClass().getName())) {
throw new IllegalArgumentException("Inner class deserializer set using constructor "
+ "(" + inner.getClass().getName() + ")" +
" is different from the one set in windowed.inner.class.serde config " +
"(" + windowInnerClassSerde.deserializer().getClass().getName() + ").");
}
} else if (inner == null && windowedInnerClassSerdeConfig == null) {
throw new IllegalArgumentException("Inner class deserializer should be set either via constructor " +
"or via the windowed.inner.class.serde config");
} else if (inner == null)
inner = windowInnerClassSerde.deserializer();
configureWindowSizeMs(configs);
configureWindowInnerDeserializerClass(configs);
}

@Override
@@ -125,4 +100,60 @@ public void setIsChangelogTopic(final boolean isChangelogTopic) {
Deserializer<T> innerDeserializer() {
return inner;
}

private void configureWindowSizeMs(final Map<String, ?> configs) {
//check to see if the window size config is set and the window size is already set from the constructor
final Long configWindowSize;
if (configs.get(WINDOW_SIZE_MS_CONFIG) instanceof String) {
configWindowSize = Long.parseLong((String) configs.get(WINDOW_SIZE_MS_CONFIG));
} else {
configWindowSize = (Long) configs.get(WINDOW_SIZE_MS_CONFIG);
}
if (windowSize != null && configWindowSize != null) {
throw new IllegalArgumentException("Window size should not be set in both the time windowed deserializer constructor and the window.size.ms config");
} else if (windowSize == null && configWindowSize == null) {
throw new IllegalArgumentException("Window size needs to be set either through the time windowed deserializer " +
"constructor or the window.size.ms config but not both");
} else {
windowSize = windowSize == null ? configWindowSize : windowSize;
}
}

@SuppressWarnings({"deprecation", "unchecked"})
private void configureWindowInnerDeserializerClass(final Map<String, ?> configs) {
String deserializerConfigFrom = WINDOWED_INNER_DESERIALIZER_CLASS;
String windowedInnerDeserializerClassConfig = (String) configs.get(WINDOWED_INNER_DESERIALIZER_CLASS);
if (windowedInnerDeserializerClassConfig == null) {
final String windowedInnerClassSerdeConfig = (String) configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
if (windowedInnerClassSerdeConfig != null) {
deserializerConfigFrom = StreamsConfig.WINDOWED_INNER_CLASS_SERDE;
windowedInnerDeserializerClassConfig = windowedInnerClassSerdeConfig;
log.warn("Config {} is deprecated. Please use {} instead.",
StreamsConfig.WINDOWED_INNER_CLASS_SERDE, WINDOWED_INNER_DESERIALIZER_CLASS);
}
}

Serde<T> windowedInnerDeserializerClass = null;
if (windowedInnerDeserializerClassConfig != null) {
try {
windowedInnerDeserializerClass = Utils.newInstance(windowedInnerDeserializerClassConfig, Serde.class);
} catch (final ClassNotFoundException e) {
throw new ConfigException(deserializerConfigFrom, windowedInnerDeserializerClassConfig,
"Serde class " + windowedInnerDeserializerClassConfig + " could not be found.");
}
}

if (inner != null && windowedInnerDeserializerClassConfig != null) {
if (!inner.getClass().getName().equals(windowedInnerDeserializerClass.deserializer().getClass().getName())) {
throw new IllegalArgumentException("Inner class deserializer set using constructor "
+ "(" + inner.getClass().getName() + ")" +
" is different from the one set in " + windowedInnerDeserializerClassConfig + " config " +
"(" + windowedInnerDeserializerClass.deserializer().getClass().getName() + ").");
}
} else if (inner == null && windowedInnerDeserializerClassConfig == null) {
throw new IllegalArgumentException("Inner class deserializer should be set either via constructor " +
"or via the " + WINDOWED_INNER_DESERIALIZER_CLASS + " config");
} else if (inner == null)
inner = windowedInnerDeserializerClass.deserializer();
}
}
Original file line number Diff line number Diff line change
@@ -24,10 +24,20 @@
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.state.internals.WindowKeySchema;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class TimeWindowedSerializer<T> implements WindowedSerializer<T> {

/**
* Configuration key for the windowed inner serializer class.
*/
public static final String WINDOWED_INNER_SERIALIZER_CLASS = "windowed.inner.serializer.class";

private final Logger log = LoggerFactory.getLogger(TimeWindowedSerializer.class);

private Serializer<T> inner;

// Default constructor needed by Kafka
@@ -38,32 +48,42 @@ public TimeWindowedSerializer(final Serializer<T> inner) {
this.inner = inner;
}

@SuppressWarnings("unchecked")
@SuppressWarnings({"deprecation", "unchecked"})
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
final String windowedInnerClassSerdeConfig = (String) configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
Serde<T> windowInnerClassSerde = null;
if (windowedInnerClassSerdeConfig != null) {
String serializerConfigFrom = WINDOWED_INNER_SERIALIZER_CLASS;
String windowedInnerSerializerClassConfig = (String) configs.get(WINDOWED_INNER_SERIALIZER_CLASS);
if (windowedInnerSerializerClassConfig == null) {
final String windowedInnerClassSerdeConfig = (String) configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
if (windowedInnerClassSerdeConfig != null) {
serializerConfigFrom = StreamsConfig.WINDOWED_INNER_CLASS_SERDE;
windowedInnerSerializerClassConfig = windowedInnerClassSerdeConfig;
log.warn("Config {} is deprecated. Please use {} instead.",
StreamsConfig.WINDOWED_INNER_CLASS_SERDE, WINDOWED_INNER_SERIALIZER_CLASS);
}
}
Serde<T> windowedInnerSerializerClass = null;
if (windowedInnerSerializerClassConfig != null) {
try {
windowInnerClassSerde = Utils.newInstance(windowedInnerClassSerdeConfig, Serde.class);
windowedInnerSerializerClass = Utils.newInstance(windowedInnerSerializerClassConfig, Serde.class);
} catch (final ClassNotFoundException e) {
throw new ConfigException(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, windowedInnerClassSerdeConfig,
"Serde class " + windowedInnerClassSerdeConfig + " could not be found.");
throw new ConfigException(serializerConfigFrom, windowedInnerSerializerClassConfig,
"Serde class " + windowedInnerSerializerClassConfig + " could not be found.");
}
}

if (inner != null && windowedInnerClassSerdeConfig != null) {
if (!inner.getClass().getName().equals(windowInnerClassSerde.serializer().getClass().getName())) {
if (inner != null && windowedInnerSerializerClassConfig != null) {
if (!inner.getClass().getName().equals(windowedInnerSerializerClass.serializer().getClass().getName())) {
throw new IllegalArgumentException("Inner class serializer set using constructor "
+ "(" + inner.getClass().getName() + ")" +
" is different from the one set in windowed.inner.class.serde config " +
"(" + windowInnerClassSerde.serializer().getClass().getName() + ").");
" is different from the one set in " + serializerConfigFrom + " config " +
"(" + windowedInnerSerializerClass.serializer().getClass().getName() + ").");
}
} else if (inner == null && windowedInnerClassSerdeConfig == null) {
} else if (inner == null && windowedInnerSerializerClassConfig == null) {
throw new IllegalArgumentException("Inner class serializer should be set either via constructor " +
"or via the windowed.inner.class.serde config");
"or via the " + WINDOWED_INNER_SERIALIZER_CLASS + " config");
} else if (inner == null)
inner = windowInnerClassSerde.serializer();
inner = windowedInnerSerializerClass.serializer();
}

@Override
Original file line number Diff line number Diff line change
@@ -45,28 +45,57 @@ public void testSessionWindowedDeserializerConstructor() {
}

@Test
public void shouldSetWindowedInnerClassDeserialiserThroughConfig() {
public void shouldSetSerializerThroughWindowedInnerClassSerdeConfig() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>();
deserializer.configure(props, false);
assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer());
}

@Test
public void shouldThrowErrorIfWindowInnerClassDeserialiserIsNotSet() {
public void shouldSetSerializerThroughWindowedInnerDeserializerClassConfig() {
props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>();
deserializer.configure(props, false);
assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer());
}

@Test
public void shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerDeserializerClassConfigIsSet() {
props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class");
final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>();
deserializer.configure(props, false);
assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer());
}

@Test
public void shouldThrowErrorIfWindowedInnerClassSerdeAndSessionWindowedDeserializerClassAreNotSet() {
final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>();
assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false));
}

@Test
public void shouldThrowErrorIfDeserialisersConflictInConstructorAndConfig() {
public void shouldThrowErrorIfDeserializersConflictInConstructorAndWindowedInnerClassSerdeConfig() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
assertThrows(IllegalArgumentException.class, () -> sessionWindowedDeserializer.configure(props, false));
}

@Test
public void shouldThrowConfigExceptionWhenInvalidWindowInnerClassDeserialiserSupplied() {
public void shouldThrowErrorIfDeserializersConflictInConstructorAndWindowedInnerDeserializerClassConfig() {
props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
assertThrows(IllegalArgumentException.class, () -> sessionWindowedDeserializer.configure(props, false));
}

@Test
public void shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class");
assertThrows(ConfigException.class, () -> sessionWindowedDeserializer.configure(props, false));
}

@Test
public void shouldThrowConfigExceptionWhenInvalidWindowedInnerDeserializerClassSupplied() {
props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, "some.non.existent.class");
assertThrows(ConfigException.class, () -> sessionWindowedDeserializer.configure(props, false));
}
}
Original file line number Diff line number Diff line change
@@ -45,28 +45,57 @@ public void testSessionWindowedSerializerConstructor() {
}

@Test
public void shouldSetWindowedInnerClassSerialiserThroughConfig() {
public void shouldSetSerializerThroughWindowedInnerClassSerdeConfig() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>();
serializer.configure(props, false);
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
}

@Test
public void shouldThrowErrorIfWindowInnerClassSerialiserIsNotSet() {
public void shouldSetSerializerThroughWindowedInnerSerializerClassConfig() {
props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>();
serializer.configure(props, false);
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
}

@Test
public void shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerSerializerClassConfigIsSet() {
props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class");
final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>();
serializer.configure(props, false);
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
}

@Test
public void shouldThrowErrorIfWindowedInnerClassSerdeAndWindowedInnerSerializerClassAreNotSet() {
final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>();
assertThrows(IllegalArgumentException.class, () -> serializer.configure(props, false));
}

@Test
public void shouldThrowErrorIfSerialisersConflictInConstructorAndConfig() {
public void shouldThrowErrorIfSerializersConflictInConstructorAndWindowedInnerClassSerdeConfig() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
assertThrows(IllegalArgumentException.class, () -> sessionWindowedSerializer.configure(props, false));
}

@Test
public void shouldThrowConfigExceptionWhenInvalidWindowInnerClassSerialiserSupplied() {
public void shouldThrowErrorIfSerializersConflictInConstructorAndWindowedInnerSerializerClassConfig() {
props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
assertThrows(IllegalArgumentException.class, () -> sessionWindowedSerializer.configure(props, false));
}

@Test
public void shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class");
assertThrows(ConfigException.class, () -> sessionWindowedSerializer.configure(props, false));
}

@Test
public void shouldThrowConfigExceptionWhenInvalidWindowedInnerSerializerClassSupplied() {
props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, "some.non.existent.class");
assertThrows(ConfigException.class, () -> sessionWindowedSerializer.configure(props, false));
}
}
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -49,7 +50,7 @@ public void testTimeWindowedDeserializerConstructor() {
}

@Test
public void shouldSetWindowSizeAndWindowedInnerDeserialiserThroughConfigs() {
public void shouldSetWindowSizeAndDeserializerThroughWindowSizeMsAndWindowedInnerClassSerdeConfigs() {
props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500");
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>();
@@ -59,34 +60,92 @@ public void shouldSetWindowSizeAndWindowedInnerDeserialiserThroughConfigs() {
}

@Test
public void shouldThrowErrorIfWindowSizeSetInConfigsAndConstructor() {
public void shouldSetWindowSizeAndDeserializerThroughWindowSizeMsAndWindowedInnerDeserializerClassConfigs() {
props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500");
props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>();
deserializer.configure(props, false);
assertThat(deserializer.getWindowSize(), is(500L));
assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer());
}

@Test
public void shouldHaveSameConfigNameForWindowSizeMs() {
assertEquals(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, StreamsConfig.WINDOW_SIZE_MS_CONFIG);
}

@Test
public void shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerDeserializerClassConfigIsSet() {
props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500");
props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class");
final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>();
deserializer.configure(props, false);
assertThat(deserializer.getWindowSize(), is(500L));
assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer());
}

@Test
public void shouldThrowErrorIfWindowSizeSetInStreamsConfigAndConstructor() {
props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500");
assertThrows(IllegalArgumentException.class, () -> timeWindowedDeserializer.configure(props, false));
}

@Test
public void shouldThrowErrorIfWindowSizeIsNotSet() {
public void shouldThrowErrorIfWindowSizeSetInConstructorConfigAndConstructor() {
props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500");
assertThrows(IllegalArgumentException.class, () -> timeWindowedDeserializer.configure(props, false));
}

@Test
public void shouldThrowErrorIfWindowSizeIsNotSetAndWindowedInnerClassSerdeIsSet() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>();
assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false));
}

@Test
public void shouldThrowErrorIfWindowedInnerClassDeserialiserIsNotSet() {
public void shouldThrowErrorIfWindowSizeIsNotSetAndWindowedInnerDeserializerClassIsSet() {
props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, Serdes.ByteArraySerde.class.getName());
final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>();
assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false));
}

@Test
public void shouldThrowErrorIfWindowedInnerClassSerdeIsNotSetAndWindowSizeMsInStreamsConfigIsSet() {
props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500");
final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>();
assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false));
}

@Test
public void shouldThrowErrorIfWindowedInnerClassDeserialisersConflictInConstructorAndConfig() {
public void shouldThrowErrorIfWindowedInnerClassSerdeIsNotSetAndWindowSizeMsInConstructorConfigIsSet() {
props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500");
final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>();
assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false));
}

@Test
public void shouldThrowErrorIfDeserializerConflictInConstructorAndWindowedInnerClassSerdeConfig() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
assertThrows(IllegalArgumentException.class, () -> timeWindowedDeserializer.configure(props, false));
}

@Test
public void shouldThrowConfigExceptionWhenInvalidWindowedInnerClassDeserialiserSupplied() {
public void shouldThrowErrorIfDeserializerConflictInConstructorAndWindowedInnerDeserializerClassConfig() {
props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
assertThrows(IllegalArgumentException.class, () -> timeWindowedDeserializer.configure(props, false));
}

@Test
public void shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class");
assertThrows(ConfigException.class, () -> timeWindowedDeserializer.configure(props, false));
}

@Test
public void shouldThrowConfigExceptionWhenInvalidWindowedInnerDeserializerClassSupplied() {
props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, "some.non.existent.class");
assertThrows(ConfigException.class, () -> timeWindowedDeserializer.configure(props, false));
}
}
Original file line number Diff line number Diff line change
@@ -45,29 +45,57 @@ public void testTimeWindowedSerializerConstructor() {
}

@Test
public void shouldSetWindowedInnerClassSerialiserThroughConfig() {
public void shouldSetSerializerThroughWindowedInnerClassSerdeConfig() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>();
serializer.configure(props, false);
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
}

@Test
public void shouldThrowErrorIfWindowedInnerClassSerialiserIsNotSet() {
public void shouldSetSerializerThroughWindowedInnerSerializerClassConfig() {
props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>();
serializer.configure(props, false);
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
}

@Test
public void shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerSerializerClassConfigIsSet() {
props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class");
final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>();
serializer.configure(props, false);
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
}

@Test
public void shouldThrowErrorIfWindowedInnerClassSerdeAndWindowedInnerSerializerClassAreNotSet() {
final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>();
assertThrows(IllegalArgumentException.class, () -> serializer.configure(props, false));
}

@Test
public void shouldThrowErrorIfWindowedInnerClassSerialisersConflictInConstructorAndConfig() {
public void shouldThrowErrorIfSerializerConflictInConstructorAndWindowedInnerClassSerdeConfig() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
assertThrows(IllegalArgumentException.class, () -> timeWindowedSerializer.configure(props, false));
}

@Test
public void shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerialiserSupplied() {
public void shouldThrowErrorIfSerializerConflictInConstructorAndWindowedInnerSerializerClassConfig() {
props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
assertThrows(IllegalArgumentException.class, () -> timeWindowedSerializer.configure(props, false));
}

@Test
public void shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() {
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class");
assertThrows(ConfigException.class, () -> timeWindowedSerializer.configure(props, false));
}

@Test
public void shouldThrowConfigExceptionWhenInvalidWindowedInnerSerializerClassSupplied() {
props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, "some.non.existent.class");
assertThrows(ConfigException.class, () -> timeWindowedSerializer.configure(props, false));
}
}