KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (2/N)#21446
KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (2/N)#21446aliehsaeedii wants to merge 15 commits intoapache:trunkfrom
Conversation
| // Check if we're upgrading from RocksDBTimestampedStore (which uses keyValueWithTimestamp CF) | ||
| final List<byte[]> existingCFs; | ||
| try { | ||
| final org.rocksdb.Options options = new org.rocksdb.Options(dbOptions, new ColumnFamilyOptions()); |
There was a problem hiding this comment.
Could we replace full qualified name with import?
| existingCFs = RocksDB.listColumnFamilies(options, dbDir.getAbsolutePath()); | ||
| options.close(); | ||
| } catch (final org.rocksdb.RocksDBException e) { | ||
| throw new org.apache.kafka.streams.errors.ProcessorStateException("Error listing column families for store " + name, e); |
| } | ||
|
|
||
| final boolean upgradingFromTimestampedStore = existingCFs.stream() | ||
| .anyMatch(cf -> java.util.Arrays.equals(cf, LEGACY_TIMESTAMPED_CF_NAME)); |
| try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class)) { | ||
| rocksDBStore.init(context, rocksDBStore); | ||
|
|
||
| assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular headers-aware mode")); |
There was a problem hiding this comment.
I never pay close attention to this, and I am personally fine either way. Of course, if there is some overall effort to move off hamcrest in favor of JUnit, it would make sense to avoid adding hamcrest code.
frankvicky
left a comment
There was a problem hiding this comment.
Thanks for the PR.
There are lots of descriptive comments in the test.
Could you please consider to put them in the assertion message?
For example:
// approx: 7 entries on legacy timestamped CF, 0 in new headers-aware CF
assertEquals(7L, rocksDBStore.approximateNumEntries());
to
assertEquals(7L, rocksDBStore.approximateNumEntries(), "should have 7 entries on legacy timestamped");
...c/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
Outdated
Show resolved
Hide resolved
| private static final byte[] LEGACY_TIMESTAMPED_CF_NAME = | ||
| "keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8); | ||
|
|
||
| // New column family for header-aware timestamped values. |
There was a problem hiding this comment.
| // New column family for header-aware timestamped values. | |
| /** | |
| * New column family for header-aware timestamped values. | |
| */ |
There was a problem hiding this comment.
This comment seems to not add any value -- I would remove it.
...c/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
Outdated
Show resolved
Hide resolved
| private static final byte[] LEGACY_TIMESTAMPED_CF_NAME = | ||
| "keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8); | ||
|
|
||
| // New column family for header-aware timestamped values. |
There was a problem hiding this comment.
This comment seems to not add any value -- I would remove it.
...c/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
Outdated
Show resolved
Hide resolved
| { | ||
| final KeyValue<Bytes, byte[]> keyValue = itAll.next(); | ||
| assertArrayEquals("key4".getBytes(), keyValue.key.get()); | ||
| assertEquals(13, keyValue.value.length, "Expected header-aware format: varint(0) + empty headers(0) + timestamp(8) + value(4) = 13 bytes for key4 from legacy CF"); |
There was a problem hiding this comment.
Similar to above. putIfAbsent would not update the value, because the key exists, but still migrate the value from legacy- to headers-CF?
There was a problem hiding this comment.
iterator does the conversion on the fly but does not apply it to the physical store.
| { | ||
| final KeyValue<Bytes, byte[]> keyValue = itAll.next(); | ||
| assertArrayEquals("key5".getBytes(), keyValue.key.get()); | ||
| assertEquals(14, keyValue.value.length, "Expected header-aware format: varint(0) + empty headers(0) + timestamp(8) + value(5) = 14 bytes for key5 from legacy CF"); |
...st/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
Show resolved
Hide resolved
...st/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
Show resolved
Hide resolved
...st/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
Outdated
Show resolved
Hide resolved
|
Please rebase to |
…/RocksDBTimestampedStoreWithHeaders.java Co-authored-by: TengYao Chi <kitingiao@gmail.com>
…/RocksDBTimestampedStoreWithHeaders.java Co-authored-by: TengYao Chi <kitingiao@gmail.com>
…/RocksDBTimestampedStoreWithHeaders.java Co-authored-by: TengYao Chi <kitingiao@gmail.com>
This PR add
RocksDBTimestampedStoreWithHeadersand the correspondingunit test (
RocksDBTimestampedStoreWithHeadersTest) for theTimestampedKeyValueStoreWithHeadersintroduced in KIP-1271.Reviewers: TengYao Chi frankvicky@apache.org, Matthias J. Sax
matthias@confluent.io