Skip to content

feat: Create JsonKinesisSource#18224

Open
linliu-code wants to merge 13 commits intoapache:masterfrom
linliu-code:create_kinesis_source
Open

feat: Create JsonKinesisSource#18224
linliu-code wants to merge 13 commits intoapache:masterfrom
linliu-code:create_kinesis_source

Conversation

@linliu-code
Copy link
Collaborator

@linliu-code linliu-code commented Feb 19, 2026

Describe the issue this Pull Request addresses

#18228

This PR adds AWS Kinesis Data Streams as a source for the Hudi DeltaStreamer, so users can ingest JSON records from Kinesis Data Streams into Hudi tables.
Previously, DeltaStreamer supported Kafka, JDBC, DFS (Parquet, CSV, ORC), and SQL sources, but not Kinesis.

Summary and Changelog

Adds JsonKinesisSource – reads JSON from AWS Kinesis Data Streams
Adds KinesisSource – base abstraction for Kinesis sources
Adds KinesisOffsetGen – handles shard iteration, checkpointing, and resumable reads
Introduces KinesisSourceConfig and KinesisReadConfig for configuration
Adds KinesisTestUtils for LocalStack-based tests
Integrates with existing DeltaStreamer and streamer metrics

Impact

DeltaStreamer users can use Kinesis as a source alongside Kafka and others
New ingestion path: Kinesis → DeltaStreamer → Hudi
Adds an optional AWS Kinesis dependency; non-Kinesis use cases are unaffected
Tests run against LocalStack, so no live AWS credentials are required

Risk Level

Low

Documentation Update

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Feb 19, 2026
@linliu-code linliu-code force-pushed the create_kinesis_source branch from 9ed3283 to fec6001 Compare February 19, 2026 23:01
@github-actions github-actions bot added size:XL PR with lines of changes > 1000 and removed size:L PR with lines of changes in (300, 1000] labels Feb 19, 2026
@linliu-code linliu-code force-pushed the create_kinesis_source branch from fec6001 to d09affd Compare February 20, 2026 00:11
@linliu-code linliu-code marked this pull request as ready for review February 20, 2026 01:30
@linliu-code linliu-code changed the title feat:Create KinesisSource feat: Create JsonKinesisSource Feb 20, 2026
1. Support aggregated records.
2. Avoid expired shards blocking the stream.
@linliu-code linliu-code force-pushed the create_kinesis_source branch from d09affd to a517e26 Compare February 20, 2026 21:27
assertTrue(checkpointAfterMerge.startsWith(streamName + ","));
int initialShardCount = KinesisOffsetGen.CheckpointUtils.strToOffsets(checkpointAfterBatch1).size();
int shardCountAfterMerge = KinesisOffsetGen.CheckpointUtils.strToOffsets(checkpointAfterMerge).size();
assertTrue(shardCountAfterMerge > initialShardCount,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After merge, new child shards are generated, but the parent shards are still there and not expired yet. So ">" not "<"

@linliu-code linliu-code force-pushed the create_kinesis_source branch from 01a34fe to d43f8eb Compare February 21, 2026 10:01
@codecov-commenter
Copy link

codecov-commenter commented Feb 21, 2026

Codecov Report

❌ Patch coverage is 0% with 455 lines in your changes missing coverage. Please review.
✅ Project coverage is 57.03%. Comparing base (ec04479) to head (a28e9e7).
⚠️ Report is 988 commits behind head on master.

Files with missing lines Patch % Lines
...di/utilities/sources/helpers/KinesisOffsetGen.java 0.00% 201 Missing ⚠️
...ache/hudi/utilities/sources/JsonKinesisSource.java 0.00% 118 Missing ⚠️
...che/hudi/utilities/config/KinesisSourceConfig.java 0.00% 81 Missing ⚠️
...g/apache/hudi/utilities/sources/KinesisSource.java 0.00% 29 Missing ⚠️
...utilities/sources/helpers/KinesisDeaggregator.java 0.00% 26 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18224      +/-   ##
============================================
- Coverage     61.43%   57.03%   -4.41%     
+ Complexity    23082    18520    -4562     
============================================
  Files          2108     1949     -159     
  Lines        127636   106587   -21049     
  Branches      14534    13196    -1338     
============================================
- Hits          78409    60787   -17622     
+ Misses        42873    40075    -2798     
+ Partials       6354     5725     -629     
Flag Coverage Δ
hadoop-mr-java-client 45.41% <ø> (?)
spark-java-tests 47.22% <0.00%> (?)
spark-scala-tests 45.30% <0.00%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...utilities/sources/helpers/KinesisDeaggregator.java 0.00% <0.00%> (ø)
...g/apache/hudi/utilities/sources/KinesisSource.java 0.00% <0.00%> (ø)
...che/hudi/utilities/config/KinesisSourceConfig.java 0.00% <0.00%> (ø)
...ache/hudi/utilities/sources/JsonKinesisSource.java 0.00% <0.00%> (ø)
...di/utilities/sources/helpers/KinesisOffsetGen.java 0.00% <0.00%> (ø)

... and 4009 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

record.approximateArrivalTimestamp().toEpochMilli());
}
return OBJECT_MAPPER.writeValueAsString(node);
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catch (Exception e) silently drops the error and returns the raw string without any logging. If offset appending fails (e.g., data isn't valid JSON, or it's a JSON array rather than an object), you'd have no way to tell why some records have offsets and others don't. Could you at least log a warning here so data quality issues are debuggable?

});

// Cache so we can both get records and checkpoint from the same RDD
fetchRdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If toBatch is called a second time (e.g., during retry), the previous persistedFetchRdd is overwritten without being unpersisted first, leaking Spark storage memory. Could you add if (persistedFetchRdd != null) { persistedFetchRdd.unpersist(); } before the persist call, or is it handled somewhere else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid persisting the RDDs? This can degrade the performance if spilling happens.

Comment on lines +436 to +473
while (allRecords.size() < maxTotalRecords && shardIterator != null) {
GetRecordsResponse response;
try {
response = client.getRecords(
GetRecordsRequest.builder()
.shardIterator(shardIterator)
.limit(Math.min(maxRecordsPerRequest, (int) (maxTotalRecords - allRecords.size())))
.build());
} catch (ExpiredIteratorException e) {
log.warn("Shard iterator expired for {} during GetRecords, stopping read", range.getShardId());
break;
} catch (ProvisionedThroughputExceededException e) {
throw new HoodieReadFromSourceException("Kinesis throughput exceeded reading shard " + range.getShardId(), e);
}

List<Record> records = response.records();
// Update shardIterator before the empty check so its null-ness correctly reflects end-of-shard
// even when the final response carries 0 records (closed shard fully exhausted).
shardIterator = response.nextShardIterator();
// CASE 1: No records returned: stop polling. nextShardIterator can be non-null when at LATEST with no new
// data; continuing would cause an infinite loop of empty GetRecords calls.
if (records.isEmpty()) {
break;
}
// CASE 2: records returned.
List<Record> toAdd = enableDeaggregation ? KinesisDeaggregator.deaggregate(records) : records;
for (Record r : toAdd) {
allRecords.add(r);
}
// Checkpoint uses the last Kinesis record's sequence number (from raw records, not deaggregated)
lastSequenceNumber = records.get(records.size() - 1).sequenceNumber();

requestCount++;
// This is for rate limiting
if (shardIterator != null && intervalMs > 0) {
Thread.sleep(intervalMs);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After deaggregation, allRecords can significantly exceed maxTotalRecords since one aggregated record can expand into many user records, but the while-loop only checks the limit before fetching. With aggressive KPL aggregation ratios (e.g., 100:1), a shard could return far more records than the configured per-shard limit. Have you considered truncating toAdd to maxTotalRecords - allRecords.size() before adding?

Comment on lines +248 to +253
// for test only
// LocalStack returns Long.MAX_VALUE for closed shards; use lastSeq as endSeq so we can detect
// "fully consumed" when the parent shard expires (lastSeq >= endSeq).
if (LOCALSTACK_END_SEQ_SENTINEL.equals(endSeq) && lastSeq != null && !lastSeq.isEmpty()) {
endSeq = lastSeq;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sentinel check is commented "for test only" but runs in the production code path. Could we remove this from production code and use a different way to handle it?

.withDocumentation("Starting position when no checkpoint exists. TRIM_HORIZON (or EARLIEST), or LATEST. Default: LATEST.");

public static final ConfigProperty<Integer> KINESIS_GET_RECORDS_MAX_RECORDS = ConfigProperty
.key(PREFIX + "getRecords.maxRecords")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.key(PREFIX + "getRecords.maxRecords")
.key(PREFIX + "max.records.per.request")

.defaultValue(10000)
.withAlternatives(OLD_PREFIX + "getRecords.maxRecords")
.markAdvanced()
.withDocumentation("Maximum number of records to fetch per GetRecords API call. Kinesis limit is 10000.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a validation on the config?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by validation? Try to test if this config is respected by Kinesis server?

Comment on lines +48 to +55
for (Record r : records) {
v1Records.add(toV1Record(r));
}
List<UserRecord> userRecords = UserRecord.deaggregate(v1Records);
List<Record> result = new ArrayList<>(userRecords.size());
for (UserRecord ur : userRecords) {
result.add(toV2Record(ur));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the conversion between V1 and V2 needed? Does the AWS SDK provide an API to do this?

Comment on lines +101 to +121
/**
* Extract lastSeq from checkpoint value (which may be "lastSeq" or "lastSeq|endSeq").
*/
public static String getLastSeqFromValue(String value) {
if (value == null || value.isEmpty()) {
return value;
}
int sep = value.indexOf(END_SEQ_SEPARATOR);
return sep >= 0 ? value.substring(0, sep) : value;
}

/**
* Extract endSeq from checkpoint value if present. Returns null for open shards.
*/
public static String getEndSeqFromValue(String value) {
if (value == null || value.isEmpty()) {
return null;
}
int sep = value.indexOf(END_SEQ_SEPARATOR);
return sep >= 0 && sep < value.length() - 1 ? value.substring(sep + 1) : null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge these two together to return Pair<String, Option<String>> of last seq and optional end seq: getLastAndEndSeqFromCheckpoint


/** LocalStack returns Long.MAX_VALUE for closed shards' endingSequenceNumber; real AWS returns actual value. */
public static final String LOCALSTACK_END_SEQ_SENTINEL = "9223372036854775807";

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid this test-only code in the production class.

Comment on lines +215 to +217
// LocalStack sentinel: when lastSeq equals sentinel, we've fully consumed
if (LOCALSTACK_END_SEQ_SENTINEL.equals(endSeq) && LOCALSTACK_END_SEQ_SENTINEL.equals(lastSeq)) {
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this test-only code.

public KinesisOffsetGen(TypedProperties props) {
this.props = props;
checkRequiredConfigProperties(props,
Arrays.asList(KinesisSourceConfig.KINESIS_STREAM_NAME, KinesisSourceConfig.KINESIS_REGION));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is KINESIS_REGION required?

.markAdvanced()
.withDocumentation("Fail when checkpoint references an expired shard instead of seeking to TRIM_HORIZON.");

public static final ConfigProperty<String> KINESIS_STARTING_POSITION = ConfigProperty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final ConfigProperty<String> KINESIS_STARTING_POSITION = ConfigProperty
public static final ConfigProperty<KinesisStartingPosition> KINESIS_STARTING_POSITION = ConfigProperty

String secretKey = getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_SECRET_KEY, null);
if (accessKey != null && !accessKey.isEmpty() && secretKey != null && !secretKey.isEmpty()) {
builder = builder.credentialsProvider(
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could HoodieConfigAWSCredentialsProvider be reused?

throw new HoodieReadFromSourceException("Kinesis throughput exceeded listing shards for " + streamName, e);
} catch (LimitExceededException e) {
throw new HoodieReadFromSourceException("Kinesis limit exceeded listing shards: " + e.getMessage(), e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also catch other exceptions? If not, where are those caught or thrown in the upper caller chain?

throw new HoodieReadFromSourceException("Kinesis limit exceeded listing shards: " + e.getMessage(), e);
}
allShards.addAll(response.shards());
nextToken = response.nextToken();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for paginated responses? Is there an API handling this for reuse, instead of hand-crafting the logic again?

long sourceLimit,
HoodieIngestionMetrics metrics) {
long maxEvents = getLongWithAltKeys(props, KinesisSourceConfig.MAX_EVENTS_FROM_KINESIS_SOURCE);
long numEvents = sourceLimit == Long.MAX_VALUE ? maxEvents : Math.min(sourceLimit, maxEvents);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this diverges from Kafka source's behavior of picking sourceLimit if it is not Long.MAX_VALUE.

return streamName + "," + parts;
}

public static boolean checkStreamCheckpoint(Option<String> lastCheckpointStr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static boolean checkStreamCheckpoint(Option<String> lastCheckpointStr) {
public static boolean isStreamCheckpointValid(Option<String> lastCheckpointStr) {

// CASE: last checkpoint exists.
if (lastCheckpointStr.isPresent() && CheckpointUtils.checkStreamCheckpoint(lastCheckpointStr)) {
Map<String, String> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
if (!checkpointOffsets.isEmpty() && lastCheckpointStr.get().startsWith(streamName + ",")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add lastCheckpointStr.get().startsWith(streamName + ",") to CheckpointUtils#checkStreamCheckpoint?

Comment on lines +332 to +339
if (endSeq != null) {
// CASE 1: lastSeq >= endSeq: all records have been consumed.
fullyConsumed = lastSeq != null && lastSeq.compareTo(endSeq) >= 0;
} else {
// CASE 2: lastSeq < endSeq: some records haven't been consumed.
// CASE 3: endSeq == null: open shard.
fullyConsumed = false;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could hasUnreadRecords be reused here?

boolean fullyConsumed;
if (endSeq != null) {
// CASE 1: lastSeq >= endSeq: all records have been consumed.
fullyConsumed = lastSeq != null && lastSeq.compareTo(endSeq) >= 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why could lastSeq be larger than endSeq? I assume lastSeq should always be less than or equal to endSeq.

for (String shardId : availableShardIds) {
if (checkpointOffsets.containsKey(shardId)) {
String lastSeq = CheckpointUtils.getLastSeqFromValue(checkpointOffsets.get(shardId));
if (lastSeq != null && !lastSeq.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastSeq should not be empty, otherwise it should throw an error. It would be good to add ValidationUtils#checkArgument.

Comment on lines +380 to +383
int targetParallelism = minPartitions > 0
? (int) Math.max(minPartitions, ranges.size())
: ranges.size();
metrics.updateStreamerSourceParallelism(targetParallelism);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The minPartitions or targetParallelism is not used to determine the parallelism or the ranges like Kafka source. Do we want to consider that in case the number of shards is low?

Comment on lines +296 to +298
public KinesisShardRange[] getNextShardRanges(Option<Checkpoint> lastCheckpoint,
long sourceLimit,
HoodieIngestionMetrics metrics) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also integrate sourceProfileSupplier (SourceProfileSupplier) or make sure to track that as as a follow-up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After learning more about Kinesis, it looks like there is rate limiting per shard (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html#kds-api-limits):

  • GetRecords: 5 transactions per second, The maximum number of records that can be returned per call is 10,000. The maximum size of data that GetRecords can return is 10 MB. If a call returns this amount of data, subsequent calls made within the next 5 seconds throw ProvisionedThroughputExceededException. If there is insufficient provisioned throughput on the stream, subsequent calls made within the next 1 second throw ProvisionedThroughputExceededException.

Based on this, further splitting a shard into reading from multiple executors may not be helpful, which is different from Kafka. I think it would be good to document these aspects.

@AllArgsConstructor
@Getter
public static class ShardReadResult implements java.io.Serializable {
private final List<Record> records;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid storing the read records in a list which can significantly increase the memory usage. We should use the iterator pattern to reduce the memory pressure on the executor side.

Comment on lines +411 to +473
/**
* Read records from a single shard.
* @param enableDeaggregation when true, de-aggregates KPL records into individual user records
*/
public static ShardReadResult readShardRecords(KinesisClient client, String streamName,
KinesisShardRange range, KinesisSourceConfig.KinesisStartingPosition defaultPosition,
int maxRecordsPerRequest, long intervalMs, long maxTotalRecords,
boolean enableDeaggregation) throws InterruptedException {
String shardIterator;
try {
shardIterator = getShardIterator(client, streamName, range, defaultPosition);
} catch (InvalidArgumentException e) {
// GetShardIterator throws InvalidArgumentException (not ExpiredIteratorException) when the
// requested sequence number is past the stream's retention window.
throw new HoodieReadFromSourceException("Sequence number in checkpoint is expired or invalid for shard "
+ range.getShardId() + ". Reset the checkpoint to recover.", e);
} catch (ResourceNotFoundException e) {
throw new HoodieReadFromSourceException("Shard or stream not found: " + range.getShardId(), e);
} catch (ProvisionedThroughputExceededException e) {
throw new HoodieReadFromSourceException("Kinesis throughput exceeded reading shard " + range.getShardId(), e);
}
List<Record> allRecords = new ArrayList<>();
String lastSequenceNumber = null;
int requestCount = 0;

while (allRecords.size() < maxTotalRecords && shardIterator != null) {
GetRecordsResponse response;
try {
response = client.getRecords(
GetRecordsRequest.builder()
.shardIterator(shardIterator)
.limit(Math.min(maxRecordsPerRequest, (int) (maxTotalRecords - allRecords.size())))
.build());
} catch (ExpiredIteratorException e) {
log.warn("Shard iterator expired for {} during GetRecords, stopping read", range.getShardId());
break;
} catch (ProvisionedThroughputExceededException e) {
throw new HoodieReadFromSourceException("Kinesis throughput exceeded reading shard " + range.getShardId(), e);
}

List<Record> records = response.records();
// Update shardIterator before the empty check so its null-ness correctly reflects end-of-shard
// even when the final response carries 0 records (closed shard fully exhausted).
shardIterator = response.nextShardIterator();
// CASE 1: No records returned: stop polling. nextShardIterator can be non-null when at LATEST with no new
// data; continuing would cause an infinite loop of empty GetRecords calls.
if (records.isEmpty()) {
break;
}
// CASE 2: records returned.
List<Record> toAdd = enableDeaggregation ? KinesisDeaggregator.deaggregate(records) : records;
for (Record r : toAdd) {
allRecords.add(r);
}
// Checkpoint uses the last Kinesis record's sequence number (from raw records, not deaggregated)
lastSequenceNumber = records.get(records.size() - 1).sequenceNumber();

requestCount++;
// This is for rate limiting
if (shardIterator != null && intervalMs > 0) {
Thread.sleep(intervalMs);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we wrap the logic here into a closable iterator that can be directly used by the executor in JsonKinesisSource#toBatch without accumulating records in memory and then returning the iterator?

Comment on lines +469 to +472
// This is for rate limiting
if (shardIterator != null && intervalMs > 0) {
Thread.sleep(intervalMs);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove this as it does self rate-limiting? Instead, we should let the SDK to do the retries after rate limiting.

Comment on lines +494 to +495
builder.shardIteratorType(defaultPosition == KinesisSourceConfig.KinesisStartingPosition.TRIM_HORIZON
? ShardIteratorType.TRIM_HORIZON : ShardIteratorType.LATEST);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KinesisSourceConfig.KinesisStartingPosition.EARLIEST should also map to ShardIteratorType.TRIM_HORIZON.

return new ShardReadResult(allRecords, Option.ofNullable(lastSequenceNumber), shardIterator == null);
}

private static String getShardIterator(KinesisClient client, String streamName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static String getShardIterator(KinesisClient client, String streamName,
private static String getCurrentCursor(KinesisClient client, String streamName,

KinesisShardRange range, KinesisSourceConfig.KinesisStartingPosition defaultPosition,
int maxRecordsPerRequest, long intervalMs, long maxTotalRecords,
boolean enableDeaggregation) throws InterruptedException {
String shardIterator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String shardIterator;
String currentCursor;

allRecords.add(r);
}
// Checkpoint uses the last Kinesis record's sequence number (from raw records, not deaggregated)
lastSequenceNumber = records.get(records.size() - 1).sequenceNumber();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that the returned records are sorted based on the sequence number. Is that guaranteed?


// Filter out shards with no unread records to avoid unnecessary GetRecords calls
boolean useLatestWhenNoCheckpoint =
offsetGen.getStartingPosition() == KinesisSourceConfig.KinesisStartingPosition.LATEST;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename this to startingStrategy or something similar to be readable?

boolean useLatestWhenNoCheckpoint =
offsetGen.getStartingPosition() == KinesisSourceConfig.KinesisStartingPosition.LATEST;
KinesisOffsetGen.KinesisShardRange[] allShardRanges = shardRanges;
int beforeFilter = shardRanges.length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int beforeFilter = shardRanges.length;
int lengthBeforeFilter = shardRanges.length;

Comment on lines +84 to +85
String checkpointStr = lastCheckpoint.isPresent() ? lastCheckpoint.get().getCheckpointKey() : "";
return new InputBatch<>(Option.empty(), checkpointStr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could there be a case where the checkpoint needs to be set, instead of empty string, and there is no message to ingest?

Comment on lines +66 to +67
KinesisOffsetGen.KinesisShardRange[] shardRanges = offsetGen.getNextShardRanges(
lastCheckpoint, sourceLimit, metrics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all the methods handling shard ranges, could you use List<KinesisOffsetGen.KinesisShardRange> instead of array so it's easier to read?

Comment on lines +103 to +111
/**
* Create checkpoint string from the batch and shard ranges.
* Subclasses provide checkpoint data (shardId -> sequenceNumber) collected during the read.
* Must include both read shards (from shardRangesRead) and filtered shards (from allShardRanges)
* so the next run does not re-read filtered-out shards from TRIM_HORIZON.
*/
protected abstract String createCheckpointFromBatch(T batch,
KinesisOffsetGen.KinesisShardRange[] shardRangesRead,
KinesisOffsetGen.KinesisShardRange[] allShardRanges);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could shardRangesRead and allShardRanges be renamed to be easily understood?

Comment on lines +66 to +67
KinesisOffsetGen.KinesisShardRange[] shardRanges = offsetGen.getNextShardRanges(
lastCheckpoint, sourceLimit, metrics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, offsetGen.getNextShardRanges returns all the open and closed ranges (excluding expired ranges) from the checkpoint and current correct?

// Handle expired shards that exist in the last checkpoint.
if (!expiredShardIds.isEmpty()) {
boolean failOnDataLoss = getBooleanWithAltKeys(props, KinesisSourceConfig.ENABLE_FAIL_ON_DATA_LOSS);
for (String shardId : expiredShardIds) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should also be validation on open and closed shards. If the last sequence number of a shard in last checkpoint is before the start of the shard based on the current state, e.g., due to data retention, there is also data loss.

Comment on lines +155 to +157
<!-- AWS Kinesis SDK for JsonKinesisSource -->
<include>software.amazon.awssdk:kinesis</include>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add this to hudi-aws-bundle so that it is not added here? I don't see any other AWS artifacts included.

Comment on lines +67 to +72
private static class ShardFetchResult implements Serializable {
private final List<String> records;
private final String shardId;
private final Option<String> lastSequenceNumber;
private final boolean reachedEndOfShard;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is similar to KinesisOffsetGen.ShardReadResult. Could we keep one of them only?

});

// Cache so we can both get records and checkpoint from the same RDD
fetchRdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid persisting the RDDs? This can degrade the performance if spilling happens.

Comment on lines +296 to +298
public KinesisShardRange[] getNextShardRanges(Option<Checkpoint> lastCheckpoint,
long sourceLimit,
HoodieIngestionMetrics metrics) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After learning more about Kinesis, it looks like there is rate limiting per shard (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html#kds-api-limits):

  • GetRecords: 5 transactions per second, The maximum number of records that can be returned per call is 10,000. The maximum size of data that GetRecords can return is 10 MB. If a call returns this amount of data, subsequent calls made within the next 5 seconds throw ProvisionedThroughputExceededException. If there is insufficient provisioned throughput on the stream, subsequent calls made within the next 1 second throw ProvisionedThroughputExceededException.

Based on this, further splitting a shard into reading from multiple executors may not be helpful, which is different from Kafka. I think it would be good to document these aspects.

Comment on lines +92 to +96
long totalMsgs = getRecordCount(batch);
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KINESIS_MESSAGE_IN_COUNT, totalMsgs);

log.info("Read {} records from Kinesis stream {} with {} shards, checkpoint: {}",
totalMsgs, offsetGen.getStreamName(), shardRanges.length, checkpointStr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This triggers eager evaluation and record reading. Could we avoid that?

private final boolean shouldAddOffsets;
private final boolean enableDeaggregation;
private final int maxRecordsPerRequest;
private final long intervalMs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename for readability


JavaRDD<ShardFetchResult> fetchRdd = sparkContext.parallelize(
java.util.Arrays.asList(shardRanges), shardRanges.length)
.mapPartitions(shardRangeIt -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason using mapPartitions instead of map?

Copy link
Collaborator Author

@linliu-code linliu-code Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally mapPartitions is more efficient than map since some resources can be reused, like the client if this partition are assigned multiple shards. But here probably no big differences since we give 1 shard per partition.

String json = recordToJsonStatic(r, range.getShardId(), readConfig.isShouldAddOffsets());
if (json != null) {
recordStrings.add(json);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it throw a runtime exception if json is null?

readConfig.getMaxRecordsPerRequest(), readConfig.getIntervalMs(), readConfig.getMaxRecordsPerShard(),
readConfig.isEnableDeaggregation());

List<String> recordStrings = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, could we avoid accumulating all JSON strings in a list, and construct an iterator instead?

Comment on lines +179 to +193
private static KinesisClient createKinesisClientFromConfig(KinesisReadConfig config) {
software.amazon.awssdk.services.kinesis.KinesisClientBuilder builder =
KinesisClient.builder().region(software.amazon.awssdk.regions.Region.of(config.getRegion()));
if (config.getEndpointUrl() != null && !config.getEndpointUrl().isEmpty()) {
builder = builder.endpointOverride(java.net.URI.create(config.getEndpointUrl()));
}
if (config.getAccessKey() != null && !config.getAccessKey().isEmpty()
&& config.getSecretKey() != null && !config.getSecretKey().isEmpty()) {
builder = builder.credentialsProvider(
software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create(
software.amazon.awssdk.auth.credentials.AwsBasicCredentials.create(
config.getAccessKey(), config.getSecretKey())));
}
return builder.build();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KinesisOffsetGen has createKinesisClient with similar logic. Let's consolidate these two into one.

if (dataStr == null || dataStr.trim().isEmpty()) {
return null;
}
if (shouldAddOffsets) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename the config and variable to be aligned with Kinesis

return dataStr;
}

private Map<String, String> buildCheckpointFromSummaries(List<ShardFetchSummary> summaries) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this contain all shards including the ones that are filtered out (i.e., shards without new data or read in this batch)?

}

@Override
protected String createCheckpointFromBatch(JavaRDD<String> batch,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checkpoint calculation logic is spread in multiple methods (toBatch, buildCheckpointFromSummaries, createCheckpointFromBatch, etc.). Could that be consolidated into one place?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XL PR with lines of changes > 1000

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants