You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
// make some change to the OffsetRanges
...
// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});
The problem is after modificaiton(the offset ranges did change) and commitAsync, I did not receive OffsetRanges as I expected in the next batch.
How should I resolve this?
The text was updated successfully, but these errors were encountered:
hello, I am using spark streaming and kafka java api, version of org.apache.spark (version 2.3.0) and org.apache.kafka(0.10).
I am using sample code provided here: https://spark.apache.org/docs/2.3.1/streaming-kafka-0-10-integration.html. But I made some modifications to the OffsetRanges in each batch like show below
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
});
The problem is after modificaiton(the offset ranges did change) and commitAsync, I did not receive OffsetRanges as I expected in the next batch.
How should I resolve this?
The text was updated successfully, but these errors were encountered: