Skip to content

KAFKA-20131: ClassicKafkaConsumer does not clear endOffsetRequested flag on failed LIST_OFFSETS calls#21457

Open
kirktrue wants to merge 15 commits intoapache:trunkfrom
kirktrue:KAFKA-20131-clear-endOffsetRequested-on-LIST_OFFSETS-failures
Open

KAFKA-20131: ClassicKafkaConsumer does not clear endOffsetRequested flag on failed LIST_OFFSETS calls#21457
kirktrue wants to merge 15 commits intoapache:trunkfrom
kirktrue:KAFKA-20131-clear-endOffsetRequested-on-LIST_OFFSETS-failures

Conversation

@kirktrue
Copy link
Contributor

@kirktrue kirktrue commented Feb 11, 2026

Updates the ClassicKafkaConsumer to clear out the SubscriptionState
endOffsetRequested flag if the LIST_OFFSETS call fails.

…ST_OFFSETS call fails

First pass at catching the case of failures. Still work to do to handle the fact that multiple responses are possible and thus we don't want to clear the flag prematurely.
@github-actions github-actions bot added consumer clients triage PRs from the community labels Feb 11, 2026
@kirktrue kirktrue marked this pull request as ready for review February 12, 2026 00:29
@viktorsomogyi viktorsomogyi self-requested a review February 12, 2026 15:26
Copy link
Contributor

@viktorsomogyi viktorsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirktrue went through the code, debugged it locally and ran the added tests, to me the PR looks fine. Since I'm not up to date with the consumer code though, I would request you to get a second opinion too.

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @kirktrue ! Initial high level comment regarding the changes related to the retry logic

Comment on lines 568 to 578
if (listOffsetsRequestState.shouldRetry) {
// This exception is caught in prepareFetchOffsetsRequests() and signals to retry this set
// of partitions. Only do that if the caller wants to retry LIST_OFFSETS calls, of course.
throw new StaleMetadataException();
} else {
// If the caller doesn't want to retry LIST_OFFSETS calls, go ahead and clear the
// 'end offsets requested' flag since there won't be another chance. Then return an empty
// list to signal that there are no requests to be sent.
offsetFetcherUtils.clearPartitionEndOffsetRequests(timestampsToSearch.keySet());
return List.of();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uhm I would say we shouldn't change this retry logic. This internal retry is for the case where when we don't know the leader for a partition when fetching offsets, so we really couldn't even send the request. In this case we want to refresh metadata and try again once we know the leader. Is there a reason why we want to change that in relationship with the issue of the flag not being cleared?

With this change, I'm afraid that if the leader is not known when we call currentLag, it would require a next call to currentLag to issue another request, and a third call to lag to get the result vs the current behaviour, where we would retry here to discover the leader and fetch, so the next call to lag would find the results already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we want to refresh metadata and try again once we know the leader. Is there a reason why we want to change that in relationship with the issue of the flag not being cleared?

The existing behavior from ClassicKafkaConsumer.currentLag() is to make a single request to LIST_OFFSETS. I'm trying to mimic with the AsyncKafkaConsumer implementation.

I added a test to OffsetsRequestManager that approximates the call to currentLag() that will fail due to a missing leader. From looking at the code, it seems as though the OffsetsRequestManager will detect the missing leader and retry the LIST_OFFSETS repeatedly, without a timeout or limit.

I'm not 100% convinced that the fire-and-forget style approach of ClassicKafkaConsumer is correct or optimal either, but it's the approach I'm trying to copy.

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, good point agree on the unbounded retries gap, but then, isn't that a problem for calls to beginningOffsets/end/offsetsForTimes too? They will timeout correctly at the API level (even expires), but seems they may leave the OffsetsRequestManager in the background retrying? (or is there a mechanism in place that I'm missing?)

If so, then agree on addressing the unbounded retries here, but should we address it consistently with all calls retrying while there is time? (instead of having the shouldRetry that splits the path)

The gap seems to be having this OffsetsRequestManager.fetchOffsets call that has not deadline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created KAFKA-20188 to track the work for adding a timeout to OffsetsRequestManager.fetchOffsets(). So we can focus on that in a separate PR.

Comment on lines 681 to 682
// The currentLag() API is a "best effort" attempt at calling the LIST_OFFSETS RPC. If it
// fails, don't retry the attempt internally, but let the user attempt it again.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is were I think we may be getting something wrong. Agree we don't want to retry the ListOffset triggered from currentLag if it fails. But we do want to retry discovering a leader in order to be able to send the request (which is what the internal retries I pointed in the comment above do).

So if this retry=false is only used to skip that leader re-discovery retry, I think we should remove it (no param needed probably?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The leader rediscovery comes from the call to Metadata.requestUpdate() that's made in OffsetsRequestManager.groupListOffsetRequests(), right? I didn't see that throwing the StaleMetadataException did anything besides re-enque the ListOffsetsRequestState, which would then send a follow up LIST_OFFSETS on the next call to OffsetsRequestManager.poll(), right?

My apologies if I'm missing something obvious 😄

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more comments. Thanks!

void clearPartitionEndOffsetRequests(Collection<TopicPartition> partitions) {
for (final TopicPartition partition : partitions) {
if (subscriptionState.maybeClearPartitionEndOffsetRequested(partition)) {
log.trace("Clearing partition end offset requested for partition {}", partition);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit confusing, dup "partition"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's TRACE, so no one will ever see it 😉

j/k I'll change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the extra "partitions" in the log message.

remainingToSearch.keySet().retainAll(value.partitionsToRetry);

offsetFetcherUtils.updateSubscriptionState(value.fetchedOffsets, isolationLevel);
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we're clearing the flag for the partitions that didn't get offsets yet. I agree we need this if we don't have any time left to retry. But if there's still time, the do-while will try again. In that case, do we want to clear the flag here?

I would imagine we don't, because we'll continue retrying while there is time. It could be the case of missing leader info for instance: we want to keep the flag on for those partitions, hit the client.awaitMetadataUpdate(timer) below, and try again in the next iteration of the do-while, right?

If so, I imagine we could take the timer into consideration here? (clear the flag for the failed partitions only if timer expired?). Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we need this if we don't have any time left to retry. But if there's still time, the do-while will try again. In that case, do we want to clear the flag here?

That's precisely what happens in the currentLag() case, though. It's always using a timeout of 0, so there's never a second pass in that loop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, we both agree we need it for currentLag/timerExpired. But in the way it's called now it applies to all cases, that's my concern. Isn't this going to clear the flag also in the case where there is time left to retry, and there is a partition that didn't have a known leader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added an explicit parameter to 'clear end offsets requests' that only the ClassicKafkaConsumer.currentLag() sets to true. This should prevent other callers from clearing the flag, regardless of the timeout setting.


@Override
public void onFailure(RuntimeException e) {
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Comment on lines +675 to +677
return true;
} else {
return false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this return type seems to only be used to log if we performed the action or not. Should we move the log here instead? (simplify this func and the caller)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left the logging at the higher level for clearing the flag when we fail to find the offset to match where the logging is done when we do find the offset, both in OffsetFetcherUtils. That's mostly so that we can enable the log level for a single class to log both the positive and negative cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I agree that it's a little wonky.

Comment on lines +2772 to +2775
assertLogEmitted(
appender,
"^Requesting the log end offset for " + tp0 + " in order to compute lag$"
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this log is printed when there is a call to subscriptionState.requestPartitionEndOffset, should we better verify(subscriptionState).requestPartitionEndOffset(tp)? (we truly care about the func being called, and these log checks easily get flaky)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the verify() approach is better. That said, I do feel that ensuring that user-facing log messages show up as we expect them to in order to make logs-based troubleshooting possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for clarity, I will add the verify() bit for sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added calls to verify() to check when requestPartitionEndOffset() is invoked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: I had to wrap the SubscriptionState instance in spy() in order to call verify() on it.

Comment on lines +2794 to +2797
assertLogEmitted(
appender,
"^Not requesting the log end offset for " + tp0 + " to compute lag as an outstanding request already exists$"
);
Copy link
Member

@lianetm lianetm Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar, would it be better to verify that we never called subscriptionState.requestPartitionEndOffset(tp0)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Will do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added calls to verify() to that requestPartitionEndOffset() is not invoked in this case.

@lianetm lianetm removed the triage PRs from the community label Feb 12, 2026
@kirktrue kirktrue changed the title KAFKA-20131: SubscriptionState endOffsetRequested remains permanently set if LIST_OFFSETS call fails KAFKA-20131: ClassicKafkaConsumer does not clear endOffsetRequested flag on failed LIST_OFFSETS calls Feb 13, 2026

offsetFetcherUtils.updateSubscriptionState(value.fetchedOffsets, isolationLevel);

if (isZeroTimestamp && shouldClearPartitionEndOffsets)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we clear the flag if (isZeroTimestamp)?

I see the shouldClearPartitionEndOffsets is passed true only from currentLag, but what about a call to consumer.endOffsets/beginningOffsets/offsetsForTimes when called with Duration.ZERO? how would the flag get cleared?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag is only set and only checked on the currentLag() path, so the other paths shouldn't need to worry about it.

It's possible for a user to pass in a zero timeout to offsetsForTimes(), et al., but we don't need to clear the flag in those cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants