Skip to content

Commit b5cceb4

Browse files
authored
KAFKA-19205: inconsistent result of beginningOffsets/endoffset between classic and async consumer with 0 timeout (#19578)
In the return results of the methods beginningOffsets and endOffset, if timeout == 0, then an empty Map should be returned uniformly instead of in the form of <TopicPartition, null> Reviewers: Ken Huang <[email protected]>, PoAn Yang <[email protected]>, Chia-Ping Tsai <[email protected]>, Lianet Magrans <[email protected]>
1 parent 93e65c4 commit b5cceb4

File tree

4 files changed

+27
-6
lines changed

4 files changed

+27
-6
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1634,7 +1634,7 @@ public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> par
16341634
* @param partitions the partitions to get the earliest offsets
16351635
* @param timeout The maximum amount of time to await retrieval of the beginning offsets
16361636
*
1637-
* @return The earliest available offsets for the given partitions
1637+
* @return The earliest available offsets for the given partitions, and it will return empty map if zero timeout is provided
16381638
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
16391639
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
16401640
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
@@ -1684,7 +1684,7 @@ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partition
16841684
* @param partitions the partitions to get the end offsets.
16851685
* @param timeout The maximum amount of time to await retrieval of the end offsets
16861686
*
1687-
* @return The end offsets for the given partitions.
1687+
* @return The end offsets for the given partitions, and it will return empty map if zero timeout is provided
16881688
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
16891689
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
16901690
* @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1306,7 +1306,10 @@ private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition
13061306
// and throw timeout exception if it cannot complete in time.
13071307
if (timeout.isZero()) {
13081308
applicationEventHandler.add(listOffsetsEvent);
1309-
return listOffsetsEvent.emptyResults();
1309+
// It is used to align with classic consumer.
1310+
// When the "timeout == 0", the classic consumer will return an empty map.
1311+
// Therefore, the AsyncKafkaConsumer needs to be consistent with it.
1312+
return new HashMap<>();
13101313
}
13111314

13121315
Map<TopicPartition, OffsetAndTimestampInternal> offsetAndTimestampMap;

clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -993,9 +993,8 @@ public void testBeginningOffsetsWithZeroTimeout() {
993993
TopicPartition tp = new TopicPartition("topic1", 0);
994994
Map<TopicPartition, Long> result =
995995
assertDoesNotThrow(() -> consumer.beginningOffsets(Collections.singletonList(tp), Duration.ZERO));
996-
// The result should be {tp=null}
997-
assertTrue(result.containsKey(tp));
998-
assertNull(result.get(tp));
996+
assertNotNull(result);
997+
assertEquals(0, result.size());
999998
verify(applicationEventHandler).add(ArgumentMatchers.isA(ListOffsetsEvent.class));
1000999
}
10011000

core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ package kafka.api
1414

1515
import kafka.api.BaseConsumerTest.{DeserializerImpl, SerializerImpl}
1616

17+
import java.lang.{Long => JLong}
1718
import java.time.Duration
1819
import java.util
1920
import java.util.Arrays.asList
@@ -873,4 +874,22 @@ class PlaintextConsumerTest extends BaseConsumerTest {
873874
waitTimeMs=leaveGroupTimeoutMs
874875
)
875876
}
877+
878+
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
879+
@MethodSource(Array("getTestGroupProtocolParametersAll"))
880+
def testOffsetRelatedWhenTimeoutZero(groupProtocol: String): Unit = {
881+
val consumer = createConsumer()
882+
val result1 = consumer.beginningOffsets(util.List.of(tp), Duration.ZERO)
883+
assertNotNull(result1)
884+
assertEquals(0, result1.size())
885+
886+
val result2 = consumer.endOffsets(util.List.of(tp), Duration.ZERO)
887+
assertNotNull(result2)
888+
assertEquals(0, result2.size())
889+
890+
val result3 = consumer.offsetsForTimes(Map[TopicPartition, JLong]((tp, 0)).asJava, Duration.ZERO)
891+
assertNotNull(result3)
892+
assertEquals(1, result3.size())
893+
assertNull(result3.get(tp))
894+
}
876895
}

0 commit comments

Comments
 (0)