Skip to content

Commit

Permalink
Add support for compressed kinesis data (apache#17062)
Browse files Browse the repository at this point in the history
  • Loading branch information
David Adams committed Sep 17, 2024
1 parent 2e2f3cf commit bc6d3dc
Show file tree
Hide file tree
Showing 16 changed files with 503 additions and 36 deletions.
7 changes: 7 additions & 0 deletions docs/ingestion/kinesis-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ The following example shows a supervisor spec for a stream with the name `Kinesi
"inputFormat": {
"type": "json"
},
"compressionFormat": "gzip",
"useEarliestSequenceNumber": true
},
"tuningConfig": {
Expand Down Expand Up @@ -148,6 +149,12 @@ The Kinesis indexing service supports the following values for `inputFormat`:

You can use `parser` to read [`thrift`](../development/extensions-contrib/thrift.md) formats.

### Compression

Unlike Kafka, Kinesis does not offer built in compression. Due to the operational costs of operating Kinesis streams at scale, you may find it advantageous to compress incoming data.

Druid supports `bz2`, `gz`, `snappy`, `xz`, `zip`, and `zstd` compressed kinesis data. If your incoming data is compressed, include the compression type in the `compressionFormat` field.

### Tuning configuration

The following table outlines the `tuningConfig` configuration properties specific to Kinesis.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ protected KinesisRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox)
tuningConfig.getRecordBufferFullWait(),
maxBytesPerPoll,
false,
useListShards
useListShards,
ioConfig.getCompressionFormat()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.utils.CompressionUtils;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
Expand All @@ -53,6 +54,7 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St

private final String awsAssumedRoleArn;
private final String awsExternalId;
private final CompressionUtils.Format compressionFormat;

@JsonCreator
public KinesisIndexTaskIOConfig(
Expand All @@ -78,7 +80,8 @@ public KinesisIndexTaskIOConfig(
@JsonProperty("endpoint") String endpoint,
@JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
@JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
@JsonProperty("awsExternalId") String awsExternalId
@JsonProperty("awsExternalId") String awsExternalId,
@JsonProperty("compressionFormat") CompressionUtils.Format compressionFormat
)
{
super(
Expand All @@ -103,6 +106,7 @@ public KinesisIndexTaskIOConfig(
this.fetchDelayMillis = fetchDelayMillis != null ? fetchDelayMillis : DEFAULT_FETCH_DELAY_MILLIS;
this.awsAssumedRoleArn = awsAssumedRoleArn;
this.awsExternalId = awsExternalId;
this.compressionFormat = compressionFormat;
}

public KinesisIndexTaskIOConfig(
Expand All @@ -117,7 +121,8 @@ public KinesisIndexTaskIOConfig(
String endpoint,
Integer fetchDelayMillis,
String awsAssumedRoleArn,
String awsExternalId
String awsExternalId,
CompressionUtils.Format compressionFormat
)
{
this(
Expand All @@ -135,7 +140,8 @@ public KinesisIndexTaskIOConfig(
endpoint,
fetchDelayMillis,
awsAssumedRoleArn,
awsExternalId
awsExternalId,
compressionFormat
);
}

Expand Down Expand Up @@ -225,6 +231,13 @@ public String getAwsExternalId()
return awsExternalId;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public CompressionUtils.Format getCompressionFormat()
{
return compressionFormat;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.utils.CompressionUtils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
Expand Down Expand Up @@ -248,6 +250,7 @@ private Runnable fetchRecords()

// list will come back empty if there are no records
for (Record kinesisRecord : recordsResult.getRecords()) {

final List<KinesisRecordEntity> data;

if (deaggregateHandle == null || getDataHandle == null) {
Expand All @@ -262,6 +265,9 @@ private Runnable fetchRecords()

int recordSize = 0;
for (UserRecord userRecord : userRecords) {
if (compressionFormat != null) {
userRecord.setData(CompressionUtils.decompress(userRecord.getData(), compressionFormat));
}
KinesisRecordEntity kinesisRecordEntity = new KinesisRecordEntity(userRecord);
recordSize += kinesisRecordEntity.getBuffer().array().length;
data.add(kinesisRecordEntity);
Expand Down Expand Up @@ -403,6 +409,8 @@ private long getPartitionTimeLag()
private final int recordBufferSizeBytes;
private final boolean useEarliestSequenceNumber;
private final boolean useListShards;
@Nullable
private final CompressionUtils.Format compressionFormat;

private ScheduledExecutorService scheduledExec;

Expand All @@ -423,7 +431,8 @@ public KinesisRecordSupplier(
int recordBufferFullWait,
int maxBytesPerPoll,
boolean useEarliestSequenceNumber,
boolean useListShards
boolean useListShards,
CompressionUtils.Format compressionFormat
)
{
Preconditions.checkNotNull(amazonKinesis);
Expand All @@ -437,6 +446,7 @@ public KinesisRecordSupplier(
this.useEarliestSequenceNumber = useEarliestSequenceNumber;
this.useListShards = useListShards;
this.backgroundFetchEnabled = fetchThreads > 0;
this.compressionFormat = compressionFormat;

// The deaggregate function is implemented by the amazon-kinesis-client, whose license was formerly not compatible
// with Apache. The code here avoids the license issue by using reflection, but is no longer necessary since
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ protected KinesisRecordSupplier createRecordSupplier()
tuningConfig.getRecordBufferFullWait(),
tuningConfig.getMaxBytesPerPollOrDefault(),
ioConfig.isUseEarliestSequenceNumber(),
tuningConfig.isUseListShards()
tuningConfig.isUseListShards(),
ioConfig.getCompressionFormat()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
ioConfig.getEndpoint(),
ioConfig.getFetchDelayMillis(),
ioConfig.getAwsAssumedRoleArn(),
ioConfig.getAwsExternalId()
ioConfig.getAwsExternalId(),
ioConfig.getCompressionFormat()
);
}

Expand Down Expand Up @@ -202,7 +203,8 @@ protected RecordSupplier<String, String, KinesisRecordEntity> setupRecordSupplie
taskTuningConfig.getRecordBufferFullWait(),
taskTuningConfig.getMaxBytesPerPollOrDefault(),
ioConfig.isUseEarliestSequenceNumber(),
spec.getSpec().getTuningConfig().isUseListShards()
spec.getSpec().getTuningConfig().isUseListShards(),
spec.getSpec().getIOConfig().getCompressionFormat()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.utils.CompressionUtils;
import org.joda.time.DateTime;
import org.joda.time.Period;

Expand All @@ -54,6 +55,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
private final String awsAssumedRoleArn;
private final String awsExternalId;
private final boolean deaggregate;
private final CompressionUtils.Format compressionFormat;

@JsonCreator
public KinesisSupervisorIOConfig(
Expand All @@ -76,7 +78,8 @@ public KinesisSupervisorIOConfig(
@JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
@JsonProperty("awsExternalId") String awsExternalId,
@Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig,
@JsonProperty("deaggregate") @Deprecated boolean deaggregate
@JsonProperty("deaggregate") @Deprecated boolean deaggregate,
@JsonProperty("compressionFormat") CompressionUtils.Format compressionFormat
)
{
super(
Expand Down Expand Up @@ -107,6 +110,7 @@ public KinesisSupervisorIOConfig(
this.awsAssumedRoleArn = awsAssumedRoleArn;
this.awsExternalId = awsExternalId;
this.deaggregate = deaggregate;
this.compressionFormat = compressionFormat;
}

@JsonProperty
Expand Down Expand Up @@ -151,6 +155,13 @@ public boolean isDeaggregate()
return deaggregate;
}

@Nullable
@JsonProperty
public CompressionUtils.Format getCompressionFormat()
{
return compressionFormat;
}

@Override
public String toString()
{
Expand All @@ -172,7 +183,8 @@ public String toString()
", fetchDelayMillis=" + fetchDelayMillis +
", awsAssumedRoleArn='" + awsAssumedRoleArn + '\'' +
", awsExternalId='" + awsExternalId + '\'' +
", deaggregate=" + deaggregate +
", deaggregate=" + deaggregate + '\'' +
", compressionFormat=" + compressionFormat +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ public void testDeserializeToOldIoConfig() throws IOException
"endpoint",
2000,
"awsAssumedRoleArn",
"awsExternalId"
"awsExternalId",
null
);

final byte[] json = mapper.writeValueAsBytes(currentConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class KinesisIndexTaskSerdeTest
"endpoint",
null,
null,
null,
null
);
private static final String ACCESS_KEY = "test-access-key";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ public void testRunWithMinimumMessageTime() throws Exception
"awsEndpoint",
null,
null,
null,
null
)
);
Expand Down Expand Up @@ -847,6 +848,7 @@ public void testRunWithMaximumMessageTime() throws Exception
"awsEndpoint",
null,
null,
null,
null
)
);
Expand Down Expand Up @@ -1946,6 +1948,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception
"awsEndpoint",
null,
null,
null,
null
),
context
Expand Down Expand Up @@ -2108,6 +2111,7 @@ public void testSequencesFromContext() throws IOException
"awsEndpoint",
null,
null,
null,
null
),
context
Expand Down Expand Up @@ -2309,6 +2313,7 @@ private KinesisIndexTask createTask(
"awsEndpoint",
null,
null,
null,
null
),
null
Expand Down
Loading

0 comments on commit bc6d3dc

Please sign in to comment.