Skip to content

Commit

Permalink
KAFKA-15859: Complete delayed RemoteListOffsets requests when a repli…
Browse files Browse the repository at this point in the history
…ca is removed from broker.

- Removed the ListOffsetsMetadata wrapper class.
- Addressed review comments from PR apache#16602
  • Loading branch information
kamalcph committed Oct 12, 2024
1 parent ca42226 commit e4ed533
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 29 deletions.
10 changes: 8 additions & 2 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1261,8 +1261,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*
* @param targetTimestamp The given timestamp for offset fetching.
* @param remoteLogManager Optional RemoteLogManager instance if it exists.
* @return The offset of the first message whose timestamp is greater than or equals to the given timestamp.
* None if no such message is found.
* @return the offset-result holder
* <ul>
* <li>When the partition is not enabled with remote storage, then it contains offset of the first message
* whose timestamp is greater than or equals to the given timestamp; None if no such message is found.
* <li>When the partition is enabled with remote storage, then it contains the job/task future and gets
* completed in the async fashion.
* <li>All special timestamp offset results are returned immediately irrespective of the remote storage.
* </ul>
*/
@nowarn("cat=deprecation")
def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): OffsetResultHolder = {
Expand Down
31 changes: 17 additions & 14 deletions core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.{Map, mutable}
import scala.jdk.CollectionConverters._

case class ListOffsetsPartitionStatus(var responseOpt: Option[ListOffsetsPartitionResponse] = None,
case class ListOffsetsPartitionStatus(@volatile var responseOpt: Option[ListOffsetsPartitionResponse],
futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]] = None,
lastFetchableOffset: Option[Long] = None,
maybeOffsetsError: Option[ApiException] = None) {
Expand All @@ -43,21 +43,14 @@ case class ListOffsetsPartitionStatus(var responseOpt: Option[ListOffsetsPartiti
}
}

case class ListOffsetsMetadata(statusByPartition: mutable.Map[TopicPartition, ListOffsetsPartitionStatus]) {

override def toString: String = {
s"ListOffsetsMetadata(statusByPartition=$statusByPartition)"
}
}

class DelayedRemoteListOffsets(delayMs: Long,
version: Int,
metadata: ListOffsetsMetadata,
statusByPartition: mutable.Map[TopicPartition, ListOffsetsPartitionStatus],
replicaManager: ReplicaManager,
responseCallback: List[ListOffsetsTopicResponse] => Unit) extends DelayedOperation(delayMs) {

// Mark the status as completed, if there is no async task to track.
// If there is a task to track, then build the response as REQUEST_TIMED_OUT by default.
metadata.statusByPartition.foreachEntry { (topicPartition, status) =>
statusByPartition.foreachEntry { (topicPartition, status) =>
status.completed = status.futureHolderOpt.isEmpty
if (status.futureHolderOpt.isDefined) {
status.responseOpt = Some(buildErrorResponse(Errors.REQUEST_TIMED_OUT, topicPartition.partition()))
Expand All @@ -69,7 +62,7 @@ class DelayedRemoteListOffsets(delayMs: Long,
* Call-back to execute when a delayed operation gets expired and hence forced to complete.
*/
override def onExpiration(): Unit = {
metadata.statusByPartition.foreachEntry { (topicPartition, status) =>
statusByPartition.foreachEntry { (topicPartition, status) =>
if (!status.completed) {
debug(s"Expiring list offset request for partition $topicPartition with status $status")
status.futureHolderOpt.foreach(futureHolder => futureHolder.jobFuture.cancel(true))
Expand All @@ -83,7 +76,7 @@ class DelayedRemoteListOffsets(delayMs: Long,
* in subclasses and will be called exactly once in forceComplete()
*/
override def onComplete(): Unit = {
val responseTopics = metadata.statusByPartition.groupBy(e => e._1.topic()).map {
val responseTopics = statusByPartition.groupBy(e => e._1.topic()).map {
case (topic, status) =>
new ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s => s.responseOpt).toList.asJava)
}.toList
Expand All @@ -99,8 +92,18 @@ class DelayedRemoteListOffsets(delayMs: Long,
*/
override def tryComplete(): Boolean = {
var completable = true
metadata.statusByPartition.foreachEntry { (partition, status) =>
statusByPartition.foreachEntry { (partition, status) =>
if (!status.completed) {
try {
replicaManager.getPartitionOrException(partition)
} catch {
case e: ApiException =>
status.futureHolderOpt.foreach { futureHolder =>
futureHolder.jobFuture.cancel(false)
futureHolder.taskFuture.complete(Left(e))
}
}

status.futureHolderOpt.foreach { futureHolder =>
if (futureHolder.taskFuture.isDone) {
val response = futureHolder.taskFuture.get() match {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ class ReplicaManager(val config: KafkaConfig,
delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
delayedRemoteFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
delayedRemoteListOffsetsPurgatory.checkAndComplete(topicPartitionOperationKey)
}

/**
Expand Down Expand Up @@ -1556,7 +1557,7 @@ class ReplicaManager(val config: KafkaConfig,
if (delayedRemoteListOffsetsRequired(statusByPartition)) {
val delayMs: Long = if (timeoutMs > 0) timeoutMs else config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs()
// create delayed remote list offsets operation
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version, ListOffsetsMetadata(statusByPartition), responseCallback)
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version, statusByPartition, this, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed remote list offsets operation
val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package kafka.server

import kafka.log.AsyncOffsetReadFutureHolder
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
Expand All @@ -37,6 +38,7 @@ class DelayedRemoteListOffsetsTest {

val delayMs = 10
val timer = new MockTimer()
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
type T = Either[Exception, Option[TimestampAndOffset]]
val purgatory =
new DelayedOperationPurgatory[DelayedRemoteListOffsets]("test-purgatory", timer, purgeInterval = 10)
Expand Down Expand Up @@ -71,14 +73,14 @@ class DelayedRemoteListOffsetsTest {
true
})

val metadata = ListOffsetsMetadata(mutable.Map(
val statusByPartition = mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(holder))
))
)

val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, metadata, responseCallback)
val listOffsetsRequestKeys = metadata.statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
assertEquals(0, DelayedRemoteListOffsetsMetrics.aggregateExpirationMeter.count())
assertEquals(0, DelayedRemoteListOffsetsMetrics.partitionExpirationMeters.size)
purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)
Expand Down Expand Up @@ -123,14 +125,14 @@ class DelayedRemoteListOffsetsTest {
true
})

val metadata = ListOffsetsMetadata(mutable.Map(
val statusByPartition = mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(holder))
))
)

val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, metadata, responseCallback)
val listOffsetsRequestKeys = metadata.statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)

assertEquals(0, cancelledCount)
Expand Down Expand Up @@ -179,17 +181,74 @@ class DelayedRemoteListOffsetsTest {
when(errorFutureHolder.taskFuture).thenAnswer(_ => errorTaskFuture)
when(errorFutureHolder.jobFuture).thenReturn(jobFuture)

val metadata = ListOffsetsMetadata(mutable.Map(
val statusByPartition = mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(errorFutureHolder))
))
)

val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, metadata, responseCallback)
val listOffsetsRequestKeys = metadata.statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)

assertEquals(0, cancelledCount)
assertEquals(listOffsetsRequestKeys.size, numResponse)
}

@Test
def testPartialResponseWhenNotLeaderOrFollowerExceptionOnOnePartition(): Unit = {
var numResponse = 0
val responseCallback = (response: List[ListOffsetsTopicResponse]) => {
response.foreach { topic =>
topic.partitions().forEach { partition =>
if (topic.name().equals("test1") && partition.partitionIndex() == 0) {
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code(), partition.errorCode())
assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, partition.timestamp())
assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, partition.offset())
assertEquals(-1, partition.leaderEpoch())
} else {
assertEquals(Errors.NONE.code(), partition.errorCode())
assertEquals(100L, partition.timestamp())
assertEquals(100L, partition.offset())
assertEquals(50, partition.leaderEpoch())
}
numResponse += 1
}
}
}

val timestampAndOffset = new TimestampAndOffset(100L, 100L, Optional.of(50))
val taskFuture = new CompletableFuture[T]()
taskFuture.complete(Right(Some(timestampAndOffset)))

var cancelledCount = 0
val jobFuture = mock(classOf[CompletableFuture[Void]])
val holder: AsyncOffsetReadFutureHolder[T] = mock(classOf[AsyncOffsetReadFutureHolder[T]])
when(holder.taskFuture).thenAnswer(_ => taskFuture)
when(holder.jobFuture).thenReturn(jobFuture)
when(jobFuture.cancel(anyBoolean())).thenAnswer(_ => {
cancelledCount += 1
true
})

when(replicaManager.getPartitionOrException(new TopicPartition("test1", 0)))
.thenThrow(new NotLeaderOrFollowerException("Not leader or follower!"))
val errorFutureHolder: AsyncOffsetReadFutureHolder[T] = mock(classOf[AsyncOffsetReadFutureHolder[T]])
val errorTaskFuture = new CompletableFuture[T]()
when(errorFutureHolder.taskFuture).thenAnswer(_ => errorTaskFuture)
when(errorFutureHolder.jobFuture).thenReturn(jobFuture)

val statusByPartition = mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(errorFutureHolder))
)

val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)

assertEquals(1, cancelledCount)
assertEquals(listOffsetsRequestKeys.size, numResponse)
}
}

0 comments on commit e4ed533

Please sign in to comment.