Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for compressed kinesis data #17083

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

funguy-tech
Copy link

Implements #17062.

Description

Allows usage of built in compression formats for Kinesis ingestion based on an optional compressionFormat parameter in ioConfig.

Unlike Kafka, Kinesis does not provide much by the means of data compression - it is a common industry pattern to compress Kinesis data across the wire with client-implemented decompression.

Changes

Prerequisite additions to CompressionUtils.java.

  • Added Jackson deserialization support to Format to enable simplified config exposure.

  • Added general compress/decompress utilities with ByteBuffer, InputStream, and OutputStream parameters

    • User specifies desired compression/decompression format as method parameter

Added compressionFormat Enum to IOConfig

  • By linking to a specified enum, the field's values are limited at load time - invalid values are automatically rejected by existing Druid spec safeguards.

  • Field is safely fed through:

    • KinesisSupervisor
    • KinesisSupervisorIOConfig
    • KinesisSamplerSpec
    • KinesisTask
    • KinesisTaskIOConfig
    • KinesisRecordSupplier

Added handling logic to Kinesis Record Supplier

  • If compression is enabled, decompress the user record and replace the existing userRecord in-place. This ensures that:
    1. We do not store both the compressed and decompressed versions of the record
    2. Buffer sizes for calculating the aggregate size of outstanding Kinesis ingestion data act on the decompressed / 'true' size of data
 if (compressionFormat != null) {
    userRecord.setData(CompressionUtils.decompress(userRecord.getData(), compressionFormat));
  }

Discussion remains open on competing designs - this is more or less a starting point of available functionality to provoke discussion (or implement if satisfactory as-is).

Release note

Added support for compressed Kinesis streams. Users may specify a compressionFormat in IOConfig. Accepted values are bz2, gz, snappy, xz, zip, and zstd.

Key changed/added classes in this PR
  • CompressionUtils
  • KinesisRecordSupplier

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • been tested in a test Druid cluster. (Active production use on streams >1GB/s aggregate throughput)

@funguy-tech funguy-tech marked this pull request as ready for review September 19, 2024 02:49
@@ -657,7 +659,7 @@ public static OutputStream compress(final OutputStream in, final Format format)
case XZ: return new XZCompressorOutputStream(in);
case SNAPPY: return new FramedSnappyCompressorOutputStream(in);
case ZSTD: return new ZstdCompressorOutputStream(in);
case ZIP: return new ZipOutputStream(in, StandardCharsets.UTF_8);
case ZIP: return new DeflaterOutputStream(in);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ZipOutputStream / ZipInputStream expect to take place on entries inside a ZipEntry. Since these helpers are intended for compressing/decompressing bytes directly, we instead use the DeflaterOutputStream / InflaterInputStream underlying compression classes for zip.

Copy link
Author

@funguy-tech funguy-tech Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, this doesn't make sense as a solution since zip isn't 'just' deflate.

An alternative would be to just use a dummy zipEntry with a single optimistic getNextEntry on decompress - or, perhaps, to just disable zip as a valid compression format for Kinesis since it doesn't make much sense in a record-streaming context.

@funguy-tech funguy-tech marked this pull request as draft September 23, 2024 20:21
@abhishekagarwal87
Copy link
Contributor

@funguy-tech - let us know when this PR is ready for review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants