KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (3/N)#21451
KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (3/N)#21451aliehsaeedii wants to merge 7 commits intoapache:trunkfrom
Conversation
...ava/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
Show resolved
Hide resolved
| final ValueTimestampHeaders<V> value) { | ||
| Objects.requireNonNull(key, "key cannot be null"); | ||
| try { | ||
| Headers headers = value.headers(); |
| } | ||
|
|
||
| protected ValueTimestampHeaders<V> outerValue(final byte[] value) { | ||
| Headers headers = ValueTimestampHeadersDeserializer.headers(value); |
| final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator = metered.range(KEY, KEY); | ||
| assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value); | ||
| assertFalse(iterator.hasNext()); | ||
| iterator.close(); |
There was a problem hiding this comment.
| final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator = metered.range(KEY, KEY); | |
| assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value); | |
| assertFalse(iterator.hasNext()); | |
| iterator.close(); | |
| try (final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator = metered.range(KEY, KEY)) { | |
| assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value); | |
| assertFalse(iterator.hasNext()); | |
| } |
| final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator = metered.all(); | ||
| assertEquals(VALUE_TIMESTAMP_HEADERS, iterator.next().value); | ||
| assertFalse(iterator.hasNext()); | ||
| iterator.close(); |
frankvicky
left a comment
There was a problem hiding this comment.
Hi @aliehsaeedii,
I have a question about the test shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled to make sure I understand its purpose correctly.
My understanding is that this test verifies the following behavior:
When changelog logging is disabled (i.e., context.changelogFor(STORE_NAME) returns null), the MeteredTimestampedKeyValueStoreWithHeaders should still pass a default changelog topic name to the serde's serializer/deserializer methods, rather than passing null.
The default topic name is generated using:
ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, taskId.topologyName())This ensures that the serde can still be properly configured even when changelog logging is not enabled for the state store.
Is my understanding correct?
Thanks!
@frankvicky |
...ava/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
Show resolved
Hide resolved
...ava/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
Outdated
Show resolved
Hide resolved
...org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
Outdated
Show resolved
Hide resolved
...org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
Outdated
Show resolved
Hide resolved
...org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
Outdated
Show resolved
Hide resolved
mjsax
left a comment
There was a problem hiding this comment.
I am still trying to wrap my head around details -- left one more comment for now, to hear what you think.
We need to avoid too much back and forth -- in my last comment, I also only said we don't need to overload get(), but I think the overloads to put() and putIfAbsent() were actually correct, as we need to change the code to extract the headers from the given value to pass into the key serializer?
For the value serializer I think, it's not necessary to pass the headers as own parameter, because the value itself is already of type ValueTimestampHeader and contains the headers already, so there seems to be no reason to pass the same headers a second time?
| return value != null ? serdes.valueFrom(value, new RecordHeaders()) : null; | ||
| } | ||
|
|
||
| protected Bytes keyBytes(final K key) { |
There was a problem hiding this comment.
Looking into the POC PR, we actually added Headers parameter here, and I think it would be the right thing to do, and just pass in new RecrodHeaders() when used inside MeteredKeyValueStore but pass in the extracted Headers object from the value in the MeteredTimestampedKeyValueWithHeadersStore case?
This implies we need to overload put() in MeteredTimestampedKeyValueWithHeadersStore.
This PR implements the metered layer of the
TimestampedKeyValueStoreWithHeadersintroduced inKIP-1271. It include utests as well.