KAFKA-20090: Add recovery logic to handle MaxValue epochFix max epoch#21469
KAFKA-20090: Add recovery logic to handle MaxValue epochFix max epoch#21469artemlivshits wants to merge 3 commits intoapache:trunkfrom
Conversation
Add testRecoveryFromEpochOverflow to verify that the system correctly handles the scenario when producer epoch reaches Short.MaxValue (32767). The test validates: - Epoch can reach Short.MaxValue through transaction timeouts - When epoch overflow is detected, errors are logged but processing continues - Transaction markers at MAX_VALUE epoch are accepted to allow recovery - Producer ID rotation occurs after overflow is detected - New transactions can proceed with rotated producer ID Changes: - TransactionMetadata: Log errors instead of throwing when epoch hits MAX_VALUE - ProducerAppendInfo: Allow marker writes at MAX_VALUE epoch for recovery - TransactionsTest: Add comprehensive test case - TransactionCoordinator: Add test accessor for transaction manager Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Enhance the epoch overflow recovery test to verify data integrity by: - Producing final committed record with distinct value "committed" - Consuming in read-committed isolation level - Asserting exactly one record is visible with correct value This validates that: - All aborted transactions are properly filtered - Only the committed transaction after producer ID rotation is visible - Read-committed isolation guarantees work correctly after epoch overflow Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Modified the test to reflect the updated epoch overflow handling where prepareFenceProducerEpoch() logs an error instead of throwing when epoch is at Short.MaxValue. The test now verifies: - prepareFenceProducerEpoch() completes without throwing - Epoch remains at Short.MaxValue (doesn't overflow to negative) - Transaction state transitions to PREPARE_EPOCH_FENCE This allows graceful recovery through producer ID rotation instead of failing hard with an exception. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
chia7712
left a comment
There was a problem hiding this comment.
@artemlivshits thanks for this patch!
| def partitionFor(transactionalId: String): Int = txnManager.partitionFor(transactionalId) | ||
|
|
||
| // Package-private for testing | ||
| private[kafka] def transactionManager: TransactionStateManager = txnManager |
There was a problem hiding this comment.
We could make txnManager private[kafka] instead of adding another getter
There was a problem hiding this comment.
for example:
class TransactionCoordinator(txnConfig: TransactionConfig,
scheduler: Scheduler,
createProducerIdManager: () => ProducerIdManager,
private[kafka] val txnManager: TransactionStateManager, // <-- here
txnMarkerChannelManager: TransactionMarkerChannelManager,
time: Time,
logContext: LogContext)| @@ -139,11 +139,12 @@ public TxnTransitMetadata prepareNoTransit() { | |||
|
|
|||
| public TxnTransitMetadata prepareFenceProducerEpoch() { | |||
| if (producerEpoch == Short.MAX_VALUE) | |||
| throw new IllegalStateException("Cannot fence producer with epoch equal to Short.MaxValue since this would overflow"); | |||
| LOGGER.error("Fencing producer {} {} with epoch equal to Short.MaxValue, this must not happen unless there is a bug", transactionalId, producerId); | |||
There was a problem hiding this comment.
What action do we expect users to take when they see this message?
There was a problem hiding this comment.
The message is there so that there is trail that this scenario happened. It would help to RCA weird cases.
| // Also don't increase if producerEpoch is already at max, to avoid overflow. | ||
| short bumpedEpoch = hasFailedEpochFence || producerEpoch == Short.MAX_VALUE ? producerEpoch : (short) (producerEpoch + 1); |
There was a problem hiding this comment.
Thanks for the patch!
I have an opinion here!
Please take a look.
In my view, this code seems to affect only TV1. Were you intending to make a change specifically for TV1? If not, this may end up being an unintended change.
AFAIK, Both TV1 and TV2 call this code line.
However, it seems that TV2 don' use this epoch generated here actually.
Even when TV2 is fenced, it actually uses the epoch obtained from txnMetadata.prepareAbortOrCommit().
Therefore, I believe any change here would affect only TV1...!
What do you think?
If I'm wrong, sorry for making you confused!
There was a problem hiding this comment.
You are correct -- the bumped epoch is then ignored in TV2 code path. But if we don't make changes here, the TV2 code path wouldn't get through.
There was a problem hiding this comment.
Thanks for your comments!!! 🙇♂️
Sorry to bother you!
I have two more questtions!
###1.
Sorry for my poor understanding...🙇♂️
I’m not fully sure which part of the TV2 code path would fail to proceed if we don’t make changes here. Could you please share a bit more detail or point me to where it gets blocked?
###2.
This change in the code/PR seems to introduce a new path where TV1 could potentially reach an epoch value of 32767.
If TV1 has never experienced this issue in practice, would it make sense to handle TV1 and TV2 differently—for example, similar to how prepareAbortOrCommit() checks supportEpochBump()?
What do you think?
Changes:
Add testRecoveryFromEpochOverflow to verify that the system correctly handles the scenario when producer epoch reaches Short.MaxValue (32767).
The test validates:
Co-Authored-By: Claude Sonnet 4.5 noreply@anthropic.com