Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Update Consumer and Producer JavaDocs for committing offsets #18336

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Dec 28, 2024

The consumer/producer JavaDocs still contain instruction for naively computing the offset to be committed.
This PR updates the JavaDocs with regard to the improvements of KIP-1094.

The consumer/producer JavaDocs still contain instruction for naively
computing the offset to be committed.
This PR updates the JavaDocs with regard to the improvements of KIP-1094.
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. A few small comments, but looks like a good improvement.

* which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There
* are actually two notions of position relevant to the user of the consumer:
* which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5.
* Note that offsets are not guaranteed to be consecutive (eg., for compacted topic, or—independent of "read_committed"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest the following the parentheses "(such as compacted topic or when records have been produced using transactions)".

* which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5.
* Note that offsets are not guaranteed to be consecutive (eg., for compacted topic, or—independent of "read_committed"
* mode— transactional topics). For example, if the consumer did read a record with offset 4, but 5 is not an offset
* with a record, it's position might advance to 6 (or higher) directly. Similarly, if the consumer's position is 5,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: its not it's

@@ -984,7 +990,9 @@ public void commitSync(Duration timeout) {
* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used. The committed offset should be the next message your application will consume,
* i.e. lastProcessedMessageOffset + 1. If automatic group management with {@link #subscribe(Collection)} is used,
* i.e. {@code nextRecordToBeProcessed.offset()} (or {@link ConsumerRecords#nextOffsets()}).
* You should also add the {@link ConsumerRecord#leaderEpoch()} (or {@code nextOffsets().get(...).leaderEpoch()})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "You should also add the leader epoch as commit metadata, which can be obtained from {@link ConsumerRecord#leaderEpoch()} or {@link ConsumerRecords#nextOffsets}." I didn't find the nextOffsets().get(...).leaderEpoch() that easy to follow and the hyperlink to the ConsumerRecords seems nicer to me.

@@ -1033,7 +1041,9 @@ public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used. The committed offset should be the next message your application will consume,
* i.e. lastProcessedMessageOffset + 1. If automatic group management with {@link #subscribe(Collection)} is used,
* i.e. {@code nextRecordToBeProcessed.offset()} (or {@link ConsumerRecords#nextOffsets()}).
* You should also add the {@link ConsumerRecord#leaderEpoch()} (or {@code nextOffsets().get(...).leaderEpoch()})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same point as above about nextOffsets().get(...).leaderEpoch().

@@ -1117,7 +1127,9 @@ public void commitAsync(OffsetCommitCallback callback) {
* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used. The committed offset should be the next message your application will consume,
* i.e. lastProcessedMessageOffset + 1. If automatic group management with {@link #subscribe(Collection)} is used,
* i.e. {@code nextRecordToBeProcessed.offset()} (or {@link ConsumerRecords#nextOffsets()}).
* You should also add the {@link ConsumerRecord#leaderEpoch()} (or {@code nextOffsets().get(...).leaderEpoch()})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

* be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
* be the next message your application will consume, i.e. {@code nextRecordToBeProcessed.offset()}
* (or {@link ConsumerRecords#nextOffsets()}). You should also add the {@link ConsumerRecord#leaderEpoch()}
* (or {@code nextOffsets().get(...).leaderEpoch()}) as commit metadata.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax thanks for this patch. one small comment is left. PTAL

@@ -266,8 +270,7 @@
* for (ConsumerRecord&lt;String, String&gt; record : partitionRecords) {
* System.out.println(record.offset() + &quot;: &quot; + record.value());
* }
* long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
* consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
* consumer.commitSync(Collections.singletonMap(partition, partitionRecords.nextOffsets().get(partition));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: partitionRecords -> records

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and missing parenthesis, I guess we should end with :

Suggested change
* consumer.commitSync(Collections.singletonMap(partition, partitionRecords.nextOffsets().get(partition));
* consumer.commitSync(Collections.singletonMap(partition, records.nextOffsets().get(partition)));

Copy link
Member Author

@mjsax mjsax Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionRecords is actually correct; nextRecords() was added to ConsumerRecord[s] (not ConsumerRecord) -- that's also why we need to call get(partition) -- ConsumerRecord is already data for a single partition, and get(partition) would not make sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax Could you please take a look at following attachment? the type of partitionRecords is List<ConsumerRecord<String, String>> rather than ConsumerRecords

Screenshot From 2025-01-03 10-42-00

btw, it miss a ) also.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sanity checking... I did mix up the variable names.

Copy link
Contributor

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the nice update! Just couple of comments (and agree with the previous suggestions)

* mode&mdash; transactional topics). For example, if the consumer did read a record with offset 4, but 5 is not an offset
* with a record, it's position might advance to 6 (or higher) directly. Similarly, if the consumer's position is 5,
* but there is no record with offset 5, the consumer will return the record with the next higher offset.
* There are actually two notions of position relevant to the user of the consumer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to this but on ln 84 right below (sorry couldn't add comment there):

The {@link #commitSync() committed position} is the last offset that has been stored securely

shouldn't that refer to committed(..) instead?

The {@link #committed(Set) committed position} is the last offset that has been stored securely

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure... I did not modify this part... There are actually two notions of position relevant to the user of the consumer: did not change.

But I think it ok as is? Guess it works both ways... It's just a difference between "write path" vs "read path", right?

Let me know what you think. If you think using #commited(Set) is better, happy to update it, but it's orthogonal to what I want to do in this PR and would be some side improvement.

@@ -266,8 +270,7 @@
* for (ConsumerRecord&lt;String, String&gt; record : partitionRecords) {
* System.out.println(record.offset() + &quot;: &quot; + record.value());
* }
* long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
* consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
* consumer.commitSync(Collections.singletonMap(partition, partitionRecords.nextOffsets().get(partition));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and missing parenthesis, I guess we should end with :

Suggested change
* consumer.commitSync(Collections.singletonMap(partition, partitionRecords.nextOffsets().get(partition));
* consumer.commitSync(Collections.singletonMap(partition, records.nextOffsets().get(partition)));

@mjsax
Copy link
Member Author

mjsax commented Jan 3, 2025

Thanks for all the input. Pushed an updated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants