Skip to content

Add zero-copy key serialization to RocksDB state backend#1

Open
nateab wants to merge 1 commit intofeature/rocksdb-bytebuffer-optimizationfrom
feature/rocksdb-zerocopy-key-serialization
Open

Add zero-copy key serialization to RocksDB state backend#1
nateab wants to merge 1 commit intofeature/rocksdb-bytebuffer-optimizationfrom
feature/rocksdb-zerocopy-key-serialization

Conversation

@nateab
Copy link
Owner

@nateab nateab commented Feb 5, 2026

This extends the ByteBuffer optimization from PR apache#27524 to key serialization, avoiding Arrays.copyOf() allocation for composite keys in RocksDB state operations.

Changes:

  • Add buildCompositeKeyNamespaceToByteBuffer() and related methods to SerializedCompositeKeyBuilder
  • Add serializeCurrentKeyWithGroupAndNamespaceToByteBuffer() to AbstractRocksDBState
  • Update RocksDBValueState.update() to use zero-copy key serialization
  • Update RocksDBMapState.put() to use zero-copy key serialization
  • Update AbstractRocksDBAppendingState.updateInternal() for zero-copy

Benchmark results show:

  • 90-97% allocation reduction for key serialization
  • GC collections reduced to zero in benchmarks
  • Combined with value optimization: >100 MB/s allocation eliminated

Safety: Same thread-safety model as value optimization - RocksDB operations are synchronous and copy data before returning, making buffer reuse safe in Flink's single-threaded mailbox execution.

Note: WriteBatch operations still use byte[] as shared buffers are unsafe with deferred writes.

What is the purpose of the change

(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)

Brief change log

(for example:)

  • The TaskInfo is stored in the blob store on job creation time as a persistent artifact
  • Deployments RPC transmits only the blob storage reference
  • TaskManagers retrieve the TaskInfo from the blob cache

Verifying this change

Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Added test that validates that TaskInfo is transferred only once across recoveries
  • Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

This extends the ByteBuffer optimization from PR apache#27524 to key
serialization, avoiding Arrays.copyOf() allocation for composite
keys in RocksDB state operations.

Changes:
- Add buildCompositeKeyNamespaceToByteBuffer() and related methods
  to SerializedCompositeKeyBuilder
- Add serializeCurrentKeyWithGroupAndNamespaceToByteBuffer() to
  AbstractRocksDBState
- Update RocksDBValueState.update() to use zero-copy key serialization
- Update RocksDBMapState.put() to use zero-copy key serialization
- Update AbstractRocksDBAppendingState.updateInternal() for zero-copy

Benchmark results show:
- 90-97% allocation reduction for key serialization
- GC collections reduced to zero in benchmarks
- Combined with value optimization: >100 MB/s allocation eliminated

Safety: Same thread-safety model as value optimization - RocksDB
operations are synchronous and copy data before returning, making
buffer reuse safe in Flink's single-threaded mailbox execution.

Note: WriteBatch operations still use byte[] as shared buffers are
unsafe with deferred writes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant