KAFKA-17411: Add new StateStore interfaces#20955
Conversation
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
bbejeck
left a comment
There was a problem hiding this comment.
Thanks for the PR @nicktelford - I made a pass and it overall looks good, since this has been around for some time, can you rebase?
These are the new interfaces detailed in KIP-1035: "StateStore managed changelog offsets". This PR introduces the interfaces changes, but makes otherwise no consequential behavioural changes. Outside of `StateStore.java`, _all_ changes are essentially just a rename of all invocations of `StateStore#flush` to instead call `StateStore#commit`. The `changelogOffsets` being passed in these invocations is currently unused: the behaviour of `StateStore#commit` remains identical to `StateStore#flush` before these changes. A new implementation of `StateStore#commit` that actually uses these offsets, along with changes to the use-site (in `ProcessorStateManager` and `GlobalStateManager`) will come in a later PR. Many strings, including documentation, and some variable names, have also been renamed (from "flush" to "commit"), to maintain consistency with the method they relate to. One exception is the `flush-rate` metric, which has not been renamed, because it will instead be deprecated in favour of a new `commit-rate` metric, which will be introduced in another PR. --- The only change in behaviour is as follows: calling `StateStore#flush` from within a `Processor` is now a guaranteed no-op. In the future, this will throw an `UnsupportedOperationException`, but to ensure no changes to end-user experience, we currently make it a no-op. Previously, any call to `StateStore#flush` from a `Processor` would have made no difference to program semantics, but likely would introduce performance problems for RocksDB. This is because it would force a flush of RocksDB memtables to disk on every invocation, which if naively used could be on _every_ `Record`. Consequently, making this a no-op should not make a difference for end-users, except potentially improving performance if they were incorrectly calling this method.
ee2323a to
429d085
Compare
|
@bbejeck @eduwercamacaro Thanks for the reviews. Rebased as requested. |
...s/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
Show resolved
Hide resolved
The `Processor` in each of these tests was incorrectly updated to call `store.commit(Map.of())`. This method cannot be called by user processors, because doing so would put the system into an inconsistent state. Removing these calls solves the problem, and has no impact on the correct running of these tests, because the writes to these stores are visible immediately to the current transaction anyway.
bbejeck
left a comment
There was a problem hiding this comment.
Thanks for the PR @nicktelford this LGTM just 2 nits
streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Bill Bejeck <bbejeck@gmail.com>
Co-authored-by: Bill Bejeck <bbejeck@gmail.com>
|
merged #20955 into trunk |
| public void flush(final DBAccessor accessor) throws RocksDBException { | ||
| public void commit(final DBAccessor accessor, | ||
| final Map<TopicPartition, Long> changelogOffsets) throws RocksDBException { | ||
| accessor.flush(columnFamily); |
There was a problem hiding this comment.
Wondering why this flush(columnFamily) call is not updated to commit(changelogOffsets) or similar?
|
|
||
| verify(inner).flush(); | ||
| verify(inner).commit(Map.of()); | ||
| assertThat((Double) getMetric("flush-rate").metricValue(), greaterThan(0.0)); |
There was a problem hiding this comment.
@nicktelford @bbejeck -- just came across this code on some other PR review, and wondering if we need to update this, and also check if the new commit-rate metric was bumped?
Same question for other store types?
We are just adding a new metered store and corresponding test -- if this will be changed in a follow up PR, please make sure to also cover the newly added MeteredTimestampedKeyValueStoreWithHeadersTest
Cf #21451
These are the new interfaces detailed in KIP-1035: "StateStore managed
changelog offsets".
This PR introduces the interfaces changes, but makes otherwise no
consequential behavioural changes.
Outside of
StateStore.java, all changes are essentially just arename of all invocations of
StateStore#flushto instead callStateStore#commit.The
changelogOffsetsbeing passed in these invocations is currentlyunused: the behaviour of
StateStore#commitremains identical toStateStore#flushbefore these changes.A new implementation of
StateStore#committhat actually uses theseoffsets, along with changes to the use-site (in
ProcessorStateManagerand
GlobalStateManager) will come in a later PR.Many strings, including documentation, and some variable names, have
also been renamed (from "flush" to "commit"), to maintain consistency
with the method they relate to.
One exception is the
flush-ratemetric, which has not been renamed,because it will instead be deprecated in favour of a new
commit-ratemetric, which will be introduced in another PR.
The only change in behaviour is as follows: calling
StateStore#flushfrom within a
Processoris now a guaranteed no-op.In the future, this will throw an
UnsupportedOperationException, butto ensure no changes to end-user experience, we currently make it a
no-op.
Previously, any call to
StateStore#flushfrom aProcessorwould havemade no difference to program semantics, but likely would introduce
performance problems for RocksDB. This is because it would force a flush
of RocksDB memtables to disk on every invocation, which if naively used
could be on every
Record.Consequently, making this a no-op should not make a difference for
end-users, except potentially improving performance if they were
incorrectly calling this method.
Reviewers: Eduwer Camacaro eduwerc@gmail.com, Bill Bejeck
bbejeck@apache.org