diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 0a63955ef1d1f..931594ccaea16 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -18,8 +18,12 @@ import kafka.cluster.EndPoint; import kafka.cluster.Partition; +import kafka.log.AsyncOffsetReadFutureHolder; import kafka.log.UnifiedLog; +import kafka.server.DelayedOperationPurgatory; +import kafka.server.DelayedRemoteListOffsets; import kafka.server.StopPartition; +import kafka.server.TopicPartitionOperationKey; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; @@ -132,11 +136,13 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import scala.Option; import scala.collection.JavaConverters; +import scala.util.Either; import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX; @@ -200,6 +206,7 @@ public class RemoteLogManager implements Closeable { private volatile boolean remoteLogManagerConfigured = false; private final Timer remoteReadTimer; + private DelayedOperationPurgatory delayedRemoteListOffsetsPurgatory; /** * Creates RemoteLogManager instance with the given arguments. @@ -263,6 +270,10 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig, ); } + public void setDelayedOperationPurgatory(DelayedOperationPurgatory delayedRemoteListOffsetsPurgatory) { + this.delayedRemoteListOffsetsPurgatory = delayedRemoteListOffsetsPurgatory; + } + public void resizeCacheSize(long remoteLogIndexFileCacheSize) { indexCache.resizeCacheSize(remoteLogIndexFileCacheSize); } @@ -620,6 +631,23 @@ private Optional maybeLeaderEpoch(int leaderEpoch) { return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ? Optional.empty() : Optional.of(leaderEpoch); } + public AsyncOffsetReadFutureHolder>> asyncOffsetRead( + TopicPartition topicPartition, + Long timestamp, + Long startingOffset, + LeaderEpochFileCache leaderEpochCache, + Supplier> searchLocalLog) { + CompletableFuture>> taskFuture = new CompletableFuture<>(); + Future jobFuture = remoteStorageReaderThreadPool.submit( + new RemoteLogOffsetReader(this, topicPartition, timestamp, startingOffset, leaderEpochCache, searchLocalLog, result -> { + TopicPartitionOperationKey key = new TopicPartitionOperationKey(topicPartition.topic(), topicPartition.partition()); + taskFuture.complete(result); + delayedRemoteListOffsetsPurgatory.checkAndComplete(key); + }) + ); + return new AsyncOffsetReadFutureHolder<>(jobFuture, taskFuture); + } + /** * Search the message offset in the remote storage based on timestamp and offset. *

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java b/core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java new file mode 100644 index 0000000000000..e82a50e9942f4 --- /dev/null +++ b/core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import scala.Option; +import scala.compat.java8.OptionConverters; +import scala.util.Either; +import scala.util.Left; +import scala.util.Right; + +public class RemoteLogOffsetReader implements Callable { + private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogOffsetReader.class); + private final RemoteLogManager rlm; + private final TopicPartition tp; + private final long timestamp; + private final long startingOffset; + private final LeaderEpochFileCache leaderEpochCache; + private final Supplier> searchInLocalLog; + private final Consumer>> callback; + + public RemoteLogOffsetReader(RemoteLogManager rlm, + TopicPartition tp, + long timestamp, + long startingOffset, + LeaderEpochFileCache leaderEpochCache, + Supplier> searchInLocalLog, + Consumer>> callback) { + this.rlm = rlm; + this.tp = tp; + this.timestamp = timestamp; + this.startingOffset = startingOffset; + this.leaderEpochCache = leaderEpochCache; + this.searchInLocalLog = searchInLocalLog; + this.callback = callback; + } + + @Override + public Void call() throws Exception { + Either> result; + try { + // If it is not found in remote storage, then search in the local storage starting with local log start offset. + Option timestampAndOffsetOpt = + OptionConverters.toScala(rlm.findOffsetByTimestamp(tp, timestamp, startingOffset, leaderEpochCache)) + .orElse(searchInLocalLog::get); + result = Right.apply(timestampAndOffsetOpt); + } catch (Exception e) { + // NOTE: All the exceptions from the secondary storage are catched instead of only the KafkaException. + LOGGER.error("Error occurred while reading the remote log offset for {}", tp, e); + result = Left.apply(e); + } + callback.accept(result); + return null; + } +} diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java index 944ef908c7e21..4ef1b871ec195 100644 --- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java @@ -27,6 +27,7 @@ import kafka.server.DelayedOperationPurgatory; import kafka.server.DelayedProduce; import kafka.server.DelayedRemoteFetch; +import kafka.server.DelayedRemoteListOffsets; import kafka.server.KafkaConfig; import kafka.server.MetadataCache; import kafka.server.QuotaFactory.QuotaManagers; @@ -66,6 +67,7 @@ public class ReplicaManagerBuilder { private Optional> delayedDeleteRecordsPurgatory = Optional.empty(); private Optional> delayedElectLeaderPurgatory = Optional.empty(); private Optional> delayedRemoteFetchPurgatory = Optional.empty(); + private Optional> delayedRemoteListOffsetsPurgatory = Optional.empty(); private Optional threadNamePrefix = Optional.empty(); private Long brokerEpoch = -1L; private Optional addPartitionsToTxnManager = Optional.empty(); @@ -210,6 +212,7 @@ public ReplicaManager build() { OptionConverters.toScala(delayedDeleteRecordsPurgatory), OptionConverters.toScala(delayedElectLeaderPurgatory), OptionConverters.toScala(delayedRemoteFetchPurgatory), + OptionConverters.toScala(delayedRemoteListOffsetsPurgatory), OptionConverters.toScala(threadNamePrefix), () -> brokerEpoch, OptionConverters.toScala(addPartitionsToTxnManager), diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java b/core/src/main/java/kafka/server/share/ShareFetchUtils.java index 4a83582726562..b16156a62bc61 100644 --- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java +++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java @@ -108,7 +108,7 @@ static long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, Replic // Isolation level is only required when reading from the latest offset hence use Option.empty() for now. Option timestampAndOffset = replicaManager.fetchOffsetForTimestamp( topicIdPartition.topicPartition(), ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(), - Optional.empty(), true); + Optional.empty(), true).timestampAndOffsetOpt(); return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset; } } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 62d8184d9259c..39b42f29fd96c 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -1576,7 +1576,7 @@ class Partition(val topicPartition: TopicPartition, isolationLevel: Option[IsolationLevel], currentLeaderEpoch: Optional[Integer], fetchOnlyFromLeader: Boolean, - remoteLogManager: Option[RemoteLogManager] = None): Option[TimestampAndOffset] = inReadLock(leaderIsrUpdateLock) { + remoteLogManager: Option[RemoteLogManager] = None): OffsetResultHolder = inReadLock(leaderIsrUpdateLock) { // decide whether to only fetch from leader val localLog = localLogWithEpochOrThrow(currentLeaderEpoch, fetchOnlyFromLeader) @@ -1601,8 +1601,10 @@ class Partition(val topicPartition: TopicPartition, s"high watermark (${localLog.highWatermark}) is lagging behind the " + s"start offset from the beginning of this epoch ($epochStart).")) - def getOffsetByTimestamp: Option[TimestampAndOffset] = { - logManager.getLog(topicPartition).flatMap(log => log.fetchOffsetByTimestamp(timestamp, remoteLogManager)) + def getOffsetByTimestamp: OffsetResultHolder = { + logManager.getLog(topicPartition) + .map(log => log.fetchOffsetByTimestamp(timestamp, remoteLogManager)) + .getOrElse(OffsetResultHolder(timestampAndOffsetOpt = None)) } // If we're in the lagging HW state after a leader election, throw OffsetNotAvailable for "latest" offset @@ -1610,12 +1612,14 @@ class Partition(val topicPartition: TopicPartition, timestamp match { case ListOffsetsRequest.LATEST_TIMESTAMP => maybeOffsetsError.map(e => throw e) - .orElse(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset, Optional.of(leaderEpoch)))) + .getOrElse(OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset, Optional.of(leaderEpoch))))) case ListOffsetsRequest.EARLIEST_TIMESTAMP | ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP => getOffsetByTimestamp case _ => - getOffsetByTimestamp.filter(timestampAndOffset => timestampAndOffset.offset < lastFetchableOffset) - .orElse(maybeOffsetsError.map(e => throw e)) + val offsetResultHolder = getOffsetByTimestamp + offsetResultHolder.maybeOffsetsError = maybeOffsetsError + offsetResultHolder.lastFetchableOffset = Some(lastFetchableOffset) + offsetResultHolder } } diff --git a/core/src/main/scala/kafka/log/OffsetResultHolder.scala b/core/src/main/scala/kafka/log/OffsetResultHolder.scala new file mode 100644 index 0000000000000..64b78c6cee912 --- /dev/null +++ b/core/src/main/scala/kafka/log/OffsetResultHolder.scala @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log + +import org.apache.kafka.common.errors.ApiException +import org.apache.kafka.common.record.FileRecords.TimestampAndOffset + +import java.util.concurrent.{CompletableFuture, Future} + +case class OffsetResultHolder(timestampAndOffsetOpt: Option[TimestampAndOffset], + futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]] = None) { + + var maybeOffsetsError: Option[ApiException] = None + var lastFetchableOffset: Option[Long] = None +} + +/** + * A remote log offset read task future holder. It contains two futures: + * 1. JobFuture - Use this future to cancel the running job. + * 2. TaskFuture - Use this future to get the result of the job/computation. + */ +case class AsyncOffsetReadFutureHolder[T](jobFuture: Future[Void], taskFuture: CompletableFuture[T]) { + +} diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index d17654b2bec4b..e6a178ed4a9e4 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1263,7 +1263,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, * None if no such message is found. */ @nowarn("cat=deprecation") - def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): Option[TimestampAndOffset] = { + def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): OffsetResultHolder = { maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") { debug(s"Searching offset for timestamp $targetTimestamp") @@ -1285,7 +1285,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, Optional.of[Integer](earliestEpochEntry.get().epoch) } else Optional.empty[Integer]() - Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt)) + OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt))) } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) { val curLocalLogStartOffset = localLogStartOffset() @@ -1297,7 +1297,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, Optional.empty() } - Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult)) + OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult))) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) { val epoch = leaderEpochCache match { case Some(cache) => @@ -1305,7 +1305,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (latestEpoch.isPresent) Optional.of[Integer](latestEpoch.getAsInt) else Optional.empty[Integer]() case None => Optional.empty[Integer]() } - Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epoch)) + OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epoch))) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) { if (remoteLogEnabled()) { val curHighestRemoteOffset = highestOffsetInRemoteStorage() @@ -1324,9 +1324,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, Optional.empty() } - Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curHighestRemoteOffset, epochResult)) + OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curHighestRemoteOffset, epochResult))) } else { - Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1))) + OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1)))) } } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides @@ -1336,13 +1336,14 @@ class UnifiedLog(@volatile var logStartOffset: Long, val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar // lookup the position of batch to avoid extra I/O val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) - latestTimestampSegment.log.batchesFrom(position.position).asScala + val timestampAndOffsetOpt = latestTimestampSegment.log.batchesFrom(position.position).asScala .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, Optional.of[Integer](batch.partitionLeaderEpoch()).filter(_ >= 0)))) + OffsetResultHolder(timestampAndOffsetOpt) } else { // We need to search the first segment whose largest timestamp is >= the target timestamp if there is one. - if (remoteLogEnabled()) { + if (remoteLogEnabled() && !isEmpty) { if (remoteLogManager.isEmpty) { throw new KafkaException("RemoteLogManager is empty even though the remote log storage is enabled.") } @@ -1350,20 +1351,24 @@ class UnifiedLog(@volatile var logStartOffset: Long, throw new KafkaException("Tiered storage is supported only with versions supporting leader epochs, that means RecordVersion must be >= 2.") } - val remoteOffset = remoteLogManager.get.findOffsetByTimestamp(topicPartition, targetTimestamp, logStartOffset, leaderEpochCache.get) - if (remoteOffset.isPresent) { - remoteOffset.asScala - } else { - // If it is not found in remote log storage, search in the local log storage from local log start offset. - searchOffsetInLocalLog(targetTimestamp, localLogStartOffset()) - } + val asyncOffsetReadFutureHolder = remoteLogManager.get.asyncOffsetRead(topicPartition, targetTimestamp, + logStartOffset, leaderEpochCache.get, () => searchOffsetInLocalLog(targetTimestamp, localLogStartOffset())) + OffsetResultHolder(None, Some(asyncOffsetReadFutureHolder)) } else { - searchOffsetInLocalLog(targetTimestamp, logStartOffset) + OffsetResultHolder(searchOffsetInLocalLog(targetTimestamp, logStartOffset)) } } } } + /** + * Checks if the log is empty. + * @return Returns True when the log is empty. Otherwise, false. + */ + private[log] def isEmpty = { + logStartOffset == logEndOffset + } + private def searchOffsetInLocalLog(targetTimestamp: Long, startOffset: Long): Option[TimestampAndOffset] = { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. diff --git a/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala b/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala new file mode 100644 index 0000000000000..0da3cc20764f4 --- /dev/null +++ b/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import com.yammer.metrics.core.Meter +import kafka.log.AsyncOffsetReadFutureHolder +import kafka.utils.Implicits._ +import kafka.utils.Pool +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.ApiException +import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.FileRecords.TimestampAndOffset +import org.apache.kafka.common.requests.ListOffsetsResponse +import org.apache.kafka.server.metrics.KafkaMetricsGroup + +import java.util.concurrent.TimeUnit +import scala.collection.{Map, mutable} +import scala.jdk.CollectionConverters._ + +case class ListOffsetsPartitionStatus(var responseOpt: Option[ListOffsetsPartitionResponse] = None, + futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]] = None, + lastFetchableOffset: Option[Long] = None, + maybeOffsetsError: Option[ApiException] = None) { + @volatile var completed = false + + override def toString: String = { + s"[responseOpt: $responseOpt, lastFetchableOffset: $lastFetchableOffset, " + + s"maybeOffsetsError: $maybeOffsetsError, completed: $completed]" + } +} + +case class ListOffsetsMetadata(statusByPartition: mutable.Map[TopicPartition, ListOffsetsPartitionStatus]) { + + override def toString: String = { + s"ListOffsetsMetadata(statusByPartition=$statusByPartition)" + } +} + +class DelayedRemoteListOffsets(delayMs: Long, + version: Int, + metadata: ListOffsetsMetadata, + responseCallback: List[ListOffsetsTopicResponse] => Unit) extends DelayedOperation(delayMs) { + + // Mark the status as completed, if there is no async task to track. + // If there is a task to track, then build the response as REQUEST_TIMED_OUT by default. + metadata.statusByPartition.forKeyValue { (topicPartition, status) => + status.completed = status.futureHolderOpt.isEmpty + if (status.futureHolderOpt.isDefined) { + status.responseOpt = Some(buildErrorResponse(Errors.REQUEST_TIMED_OUT, topicPartition.partition())) + } + trace(s"Initial partition status for $topicPartition is $status") + } + + /** + * Call-back to execute when a delayed operation gets expired and hence forced to complete. + */ + override def onExpiration(): Unit = { + metadata.statusByPartition.forKeyValue { (topicPartition, status) => + if (!status.completed) { + debug(s"Expiring list offset request for partition $topicPartition with status $status") + status.futureHolderOpt.foreach(futureHolder => futureHolder.jobFuture.cancel(true)) + DelayedRemoteListOffsetsMetrics.recordExpiration(topicPartition) + } + } + } + + /** + * Process for completing an operation; This function needs to be defined + * in subclasses and will be called exactly once in forceComplete() + */ + override def onComplete(): Unit = { + val responseTopics = metadata.statusByPartition.groupBy(e => e._1.topic()).map { + case (topic, status) => + new ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s => s.responseOpt).toList.asJava) + }.toList + responseCallback(responseTopics) + } + + /** + * Try to complete the delayed operation by first checking if the operation + * can be completed by now. If yes execute the completion logic by calling + * forceComplete() and return true iff forceComplete returns true; otherwise return false + * + * This function needs to be defined in subclasses + */ + override def tryComplete(): Boolean = { + var completable = true + metadata.statusByPartition.forKeyValue { (partition, status) => + if (!status.completed) { + status.futureHolderOpt.foreach { futureHolder => + if (futureHolder.taskFuture.isDone) { + val response = futureHolder.taskFuture.get() match { + case Left(e) => + buildErrorResponse(Errors.forException(e), partition.partition()) + + case Right(None) => + val error = status.maybeOffsetsError + .map(e => if (version >= 5) Errors.forException(e) else Errors.LEADER_NOT_AVAILABLE) + .getOrElse(Errors.NONE) + buildErrorResponse(error, partition.partition()) + + case Right(Some(found)) => + var partitionResponse = buildErrorResponse(Errors.NONE, partition.partition()) + if (status.lastFetchableOffset.isDefined && found.offset >= status.lastFetchableOffset.get) { + if (status.maybeOffsetsError.isDefined) { + val error = if (version >= 5) Errors.forException(status.maybeOffsetsError.get) else Errors.LEADER_NOT_AVAILABLE + partitionResponse.setErrorCode(error.code()) + } + } else { + partitionResponse = new ListOffsetsPartitionResponse() + .setPartitionIndex(partition.partition()) + .setErrorCode(Errors.NONE.code()) + .setTimestamp(found.timestamp) + .setOffset(found.offset) + + if (found.leaderEpoch.isPresent && version >= 4) { + partitionResponse.setLeaderEpoch(found.leaderEpoch.get) + } + } + partitionResponse + } + status.responseOpt = Some(response) + status.completed = true + } + completable = completable && futureHolder.taskFuture.isDone + } + } + } + if (completable) { + forceComplete() + } else { + false + } + } + + private def buildErrorResponse(e: Errors, partitionIndex: Int): ListOffsetsPartitionResponse = { + new ListOffsetsPartitionResponse() + .setPartitionIndex(partitionIndex) + .setErrorCode(e.code) + .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET) + } +} + +object DelayedRemoteListOffsetsMetrics { + private val metricsGroup = new KafkaMetricsGroup(DelayedRemoteListOffsetsMetrics.getClass) + private[server] val aggregateExpirationMeter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS) + private val partitionExpirationMeterFactory = (key: TopicPartition) => + metricsGroup.newMeter("ExpiresPerSec", + "requests", + TimeUnit.SECONDS, + Map("topic" -> key.topic, "partition" -> key.partition.toString).asJava) + private[server] val partitionExpirationMeters = new Pool[TopicPartition, Meter](valueFactory = Some(partitionExpirationMeterFactory)) + + def recordExpiration(partition: TopicPartition): Unit = { + aggregateExpirationMeter.mark() + partitionExpirationMeters.getAndMaybePut(partition).mark() + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 7a7ae1511893d..2acface315db1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -90,7 +90,7 @@ import java.util.concurrent.{CompletableFuture, ConcurrentHashMap} import java.util.{Collections, Optional, OptionalInt} import scala.annotation.nowarn import scala.collection.mutable.ArrayBuffer -import scala.collection.{Map, Seq, Set, immutable, mutable} +import scala.collection.{Map, Seq, Set, mutable} import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} @@ -1078,14 +1078,17 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion - val topics = if (version == 0) - handleListOffsetRequestV0(request) - else - handleListOffsetRequestV1AndAbove(request) + def sendResponseCallback(response: List[ListOffsetsTopicResponse]): Unit = { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + new ListOffsetsResponse(new ListOffsetsResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(response.asJava))) + } - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetsResponse(new ListOffsetsResponseData() - .setThrottleTimeMs(requestThrottleMs) - .setTopics(topics.asJava))) + if (version == 0) + sendResponseCallback(handleListOffsetRequestV0(request)) + else + handleListOffsetRequestV1AndAbove(request, sendResponseCallback) } private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetsTopicResponse] = { @@ -1151,18 +1154,12 @@ class KafkaApis(val requestChannel: RequestChannel, (responseTopics ++ unauthorizedResponseStatus).toList } - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): List[ListOffsetsTopicResponse] = { + private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request, + responseCallback: List[ListOffsetsTopicResponse] => Unit): Unit = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetsRequest] val version = request.header.apiVersion - val timestampMinSupportedVersion = immutable.Map[Long, Short]( - ListOffsetsRequest.EARLIEST_TIMESTAMP -> 1.toShort, - ListOffsetsRequest.LATEST_TIMESTAMP -> 1.toShort, - ListOffsetsRequest.MAX_TIMESTAMP -> 7.toShort, - ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP -> 8.toShort, - ListOffsetsRequest.LATEST_TIERED_TIMESTAMP -> 9.toShort - ) def buildErrorResponse(e: Errors, partition: ListOffsetsPartition): ListOffsetsPartitionResponse = { new ListOffsetsPartitionResponse() @@ -1182,75 +1179,18 @@ class KafkaApis(val requestChannel: RequestChannel, buildErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED, partition)).asJava) ) - val responseTopics = authorizedRequestInfo.map { topic => - val responsePartitions = topic.partitions.asScala.map { partition => - val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + - s"failed because the partition is duplicated in the request.") - buildErrorResponse(Errors.INVALID_REQUEST, partition) - } else if (partition.timestamp() < 0 && - (!timestampMinSupportedVersion.contains(partition.timestamp()) || version < timestampMinSupportedVersion(partition.timestamp()))) { - buildErrorResponse(Errors.UNSUPPORTED_VERSION, partition) - } else { - try { - val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetsRequest.DEBUGGING_REPLICA_ID - val isClientRequest = offsetRequest.replicaId == ListOffsetsRequest.CONSUMER_REPLICA_ID - val isolationLevelOpt = if (isClientRequest) - Some(offsetRequest.isolationLevel) - else - None - - val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, - partition.timestamp, - isolationLevelOpt, - if (partition.currentLeaderEpoch == ListOffsetsResponse.UNKNOWN_EPOCH) Optional.empty() else Optional.of(partition.currentLeaderEpoch), - fetchOnlyFromLeader) - - val response = foundOpt match { - case Some(found) => - val partitionResponse = new ListOffsetsPartitionResponse() - .setPartitionIndex(partition.partitionIndex) - .setErrorCode(Errors.NONE.code) - .setTimestamp(found.timestamp) - .setOffset(found.offset) - if (found.leaderEpoch.isPresent && version >= 4) - partitionResponse.setLeaderEpoch(found.leaderEpoch.get) - partitionResponse - case None => - buildErrorResponse(Errors.NONE, partition) - } - response - } catch { - // NOTE: These exceptions are special cases since these error messages are typically transient or the client - // would have received a clear exception and there is no value in logging the entire stack trace for the same - case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderOrFollowerException | - _ : UnknownLeaderEpochException | - _ : FencedLeaderEpochException | - _ : KafkaStorageException | - _ : UnsupportedForMessageFormatException) => - debug(s"Offset request with correlation id $correlationId from client $clientId on " + - s"partition $topicPartition failed due to ${e.getMessage}") - buildErrorResponse(Errors.forException(e), partition) - - // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE - case e: OffsetNotAvailableException => - if (request.header.apiVersion >= 5) { - buildErrorResponse(Errors.forException(e), partition) - } else { - buildErrorResponse(Errors.LEADER_NOT_AVAILABLE, partition) - } + def sendV1ResponseCallback(response: List[ListOffsetsTopicResponse]): Unit = { + val mergedResponses = response ++ unauthorizedResponseStatus + responseCallback(mergedResponses) + } - case e: Throwable => - error("Error while responding to offset request", e) - buildErrorResponse(Errors.forException(e), partition) - } - } - } - new ListOffsetsTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava) + if (authorizedRequestInfo.isEmpty) { + sendV1ResponseCallback(List.empty) + } else { + replicaManager.fetchOffset(authorizedRequestInfo, offsetRequest.duplicatePartitions().asScala, + offsetRequest.isolationLevel(), offsetRequest.replicaId(), clientId, correlationId, version, + buildErrorResponse, sendV1ResponseCallback) } - (responseTopics ++ unauthorizedResponseStatus).toList } private def metadataResponseTopic(error: Errors, diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 74326b774f277..aafa62813ff48 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -20,10 +20,10 @@ import com.yammer.metrics.core.Meter import kafka.cluster.{BrokerEndPoint, Partition, PartitionListener} import kafka.controller.{KafkaController, StateChangeLogger} import kafka.log.remote.RemoteLogManager -import kafka.log.{LogManager, UnifiedLog} +import kafka.log.{LogManager, OffsetResultHolder, UnifiedLog} import kafka.server.HostedPartition.Online import kafka.server.QuotaFactory.QuotaManagers -import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult} +import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported} import kafka.server.metadata.ZkMetadataCache import kafka.utils.Implicits._ import kafka.utils._ @@ -34,6 +34,8 @@ import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPa import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsTopic import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaderAndIsrResponseData.{LeaderAndIsrPartitionError, LeaderAndIsrTopicError} +import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} +import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse} import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult} import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState @@ -41,7 +43,6 @@ import org.apache.kafka.common.message.{DescribeLogDirsResponseData, DescribePro import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record._ import org.apache.kafka.common.replica.PartitionView.DefaultPartitionView import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView @@ -71,7 +72,7 @@ import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.Lock import java.util.concurrent.{CompletableFuture, Future, RejectedExecutionException, TimeUnit} import java.util.{Collections, Optional, OptionalInt, OptionalLong} -import scala.collection.{Map, Seq, Set, mutable} +import scala.collection.{Map, Seq, Set, immutable, mutable} import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ @@ -225,6 +226,14 @@ object ReplicaManager { private[server] val MetricNames = GaugeMetricNames.union(MeterMetricNames) + private val timestampMinSupportedVersion: immutable.Map[Long, Short] = immutable.Map[Long, Short]( + ListOffsetsRequest.EARLIEST_TIMESTAMP -> 1.toShort, + ListOffsetsRequest.LATEST_TIMESTAMP -> 1.toShort, + ListOffsetsRequest.MAX_TIMESTAMP -> 7.toShort, + ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP -> 8.toShort, + ListOffsetsRequest.LATEST_TIERED_TIMESTAMP -> 9.toShort + ) + def createLogReadResult(highWatermark: Long, leaderLogStartOffset: Long, leaderLogEndOffset: Long, @@ -251,6 +260,11 @@ object ReplicaManager { lastStableOffset = None, exception = Some(e)) } + + private[server] def isListOffsetsTimestampUnsupported(timestamp: JLong, version: Short): Boolean = { + timestamp < 0 && + (!timestampMinSupportedVersion.contains(timestamp) || version < timestampMinSupportedVersion(timestamp)) + } } class ReplicaManager(val config: KafkaConfig, @@ -271,6 +285,7 @@ class ReplicaManager(val config: KafkaConfig, delayedDeleteRecordsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None, delayedElectLeaderPurgatoryParam: Option[DelayedOperationPurgatory[DelayedElectLeader]] = None, delayedRemoteFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None, + delayedRemoteListOffsetsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteListOffsets]] = None, threadNamePrefix: Option[String] = None, val brokerEpochSupplier: () => Long = () => -1, addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None, @@ -296,6 +311,9 @@ class ReplicaManager(val config: KafkaConfig, val delayedRemoteFetchPurgatory = delayedRemoteFetchPurgatoryParam.getOrElse( DelayedOperationPurgatory[DelayedRemoteFetch]( purgatoryName = "RemoteFetch", brokerId = config.brokerId)) + val delayedRemoteListOffsetsPurgatory = delayedRemoteListOffsetsPurgatoryParam.getOrElse( + DelayedOperationPurgatory[DelayedRemoteListOffsets]( + purgatoryName = "RemoteListOffsets", brokerId = config.brokerId)) /* epoch of the controller that last changed the leader */ @volatile private[server] var controllerEpoch: Int = KafkaController.InitialControllerEpoch @@ -396,6 +414,7 @@ class ReplicaManager(val config: KafkaConfig, logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure) logDirFailureHandler.start() addPartitionsToTxnManager.foreach(_.start()) + remoteLogManager.foreach(rlm => rlm.setDelayedOperationPurgatory(delayedRemoteListOffsetsPurgatory)) } private def maybeRemoveTopicMetrics(topic: String): Unit = { @@ -1447,11 +1466,123 @@ class ReplicaManager(val config: KafkaConfig, } } + def fetchOffset(topics: Seq[ListOffsetsTopic], + duplicatePartitions: Set[TopicPartition], + isolationLevel: IsolationLevel, + replicaId: Int, + clientId: String, + correlationId: Int, + version: Short, + buildErrorResponse: (Errors, ListOffsetsPartition) => ListOffsetsPartitionResponse, + responseCallback: List[ListOffsetsTopicResponse] => Unit): Unit = { + val statusByPartition = mutable.Map[TopicPartition, ListOffsetsPartitionStatus]() + topics.foreach { topic => + topic.partitions.asScala.foreach { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) + if (duplicatePartitions.contains(topicPartition)) { + debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + + s"failed because the partition is duplicated in the request.") + statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.INVALID_REQUEST, partition))) + } else if (isListOffsetsTimestampUnsupported(partition.timestamp(), version)) { + statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.UNSUPPORTED_VERSION, partition))) + } else { + try { + val fetchOnlyFromLeader = replicaId != ListOffsetsRequest.DEBUGGING_REPLICA_ID + val isClientRequest = replicaId == ListOffsetsRequest.CONSUMER_REPLICA_ID + val isolationLevelOpt = if (isClientRequest) + Some(isolationLevel) + else + None + + val resultHolder = fetchOffsetForTimestamp(topicPartition, + partition.timestamp, + isolationLevelOpt, + if (partition.currentLeaderEpoch == ListOffsetsResponse.UNKNOWN_EPOCH) Optional.empty() else Optional.of(partition.currentLeaderEpoch), + fetchOnlyFromLeader) + + val status = resultHolder match { + case OffsetResultHolder(Some(found), _) => + // This case is for normal topic that does not have remote storage. + var partitionResponse = buildErrorResponse(Errors.NONE, partition) + if (resultHolder.lastFetchableOffset.isDefined && + found.offset >= resultHolder.lastFetchableOffset.get) { + resultHolder.maybeOffsetsError.map(e => throw e) + } else { + partitionResponse = new ListOffsetsPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.NONE.code) + .setTimestamp(found.timestamp) + .setOffset(found.offset) + if (found.leaderEpoch.isPresent && version >= 4) + partitionResponse.setLeaderEpoch(found.leaderEpoch.get) + } + ListOffsetsPartitionStatus(Some(partitionResponse)) + case OffsetResultHolder(None, None) => + // This is an empty offset response scenario + resultHolder.maybeOffsetsError.map(e => throw e) + ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.NONE, partition))) + case OffsetResultHolder(None, Some(futureHolder)) => + // This case is for topic enabled with remote storage and we want to search the timestamp in + // remote storage using async fashion. + ListOffsetsPartitionStatus(None, Some(futureHolder), resultHolder.lastFetchableOffset, resultHolder.maybeOffsetsError) + } + statusByPartition += topicPartition -> status + } catch { + // NOTE: These exceptions are special cases since these error messages are typically transient or the client + // would have received a clear exception and there is no value in logging the entire stack trace for the same + case e @ (_ : UnknownTopicOrPartitionException | + _ : NotLeaderOrFollowerException | + _ : UnknownLeaderEpochException | + _ : FencedLeaderEpochException | + _ : KafkaStorageException | + _ : UnsupportedForMessageFormatException) => + debug(s"Offset request with correlation id $correlationId from client $clientId on " + + s"partition $topicPartition failed due to ${e.getMessage}") + statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.forException(e), partition))) + + // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE + case e: OffsetNotAvailableException => + if (version >= 5) { + statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.forException(e), partition))) + } else { + statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.LEADER_NOT_AVAILABLE, partition))) + } + + case e: Throwable => + error("Error while responding to offset request", e) + statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.forException(e), partition))) + } + } + } + } + + if (delayedRemoteListOffsetsRequired(statusByPartition)) { + val timeout = config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs() + // create delayed remote list offsets operation + val delayedRemoteListOffsets = new DelayedRemoteListOffsets(timeout, version, ListOffsetsMetadata(statusByPartition), responseCallback) + // create a list of (topic, partition) pairs to use as keys for this delayed remote list offsets operation + val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq + // try to complete the request immediately, otherwise put it into the purgatory + delayedRemoteListOffsetsPurgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys) + } else { + // we can respond immediately + val responseTopics = statusByPartition.groupBy(e => e._1.topic()).map { + case (topic, status) => + new ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s => s.responseOpt).toList.asJava) + }.toList + responseCallback(responseTopics) + } + } + + private def delayedRemoteListOffsetsRequired(responseByPartition: Map[TopicPartition, ListOffsetsPartitionStatus]): Boolean = { + responseByPartition.values.exists(status => status.futureHolderOpt.isDefined) + } + def fetchOffsetForTimestamp(topicPartition: TopicPartition, timestamp: Long, isolationLevel: Option[IsolationLevel], currentLeaderEpoch: Optional[Integer], - fetchOnlyFromLeader: Boolean): Option[TimestampAndOffset] = { + fetchOnlyFromLeader: Boolean): OffsetResultHolder = { val partition = getPartitionOrException(topicPartition) partition.fetchOffsetForTimestamp(timestamp, isolationLevel, currentLeaderEpoch, fetchOnlyFromLeader, remoteLogManager) } @@ -2543,6 +2674,7 @@ class ReplicaManager(val config: KafkaConfig, replicaAlterLogDirsManager.shutdown() delayedFetchPurgatory.shutdown() delayedRemoteFetchPurgatory.shutdown() + delayedRemoteListOffsetsPurgatory.shutdown() delayedProducePurgatory.shutdown() delayedDeleteRecordsPurgatory.shutdown() delayedElectLeaderPurgatory.shutdown() diff --git a/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java b/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java new file mode 100644 index 0000000000000..e7d5ce8981b6f --- /dev/null +++ b/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import kafka.log.AsyncOffsetReadFutureHolder; +import kafka.utils.TestUtils; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import scala.Option; +import scala.util.Either; + +import static org.apache.kafka.common.record.FileRecords.TimestampAndOffset; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class RemoteLogOffsetReaderTest { + + private final MockTime time = new MockTime(); + private final TopicPartition topicPartition = new TopicPartition("test", 0); + private Path logDir; + private LeaderEpochFileCache cache; + private MockRemoteLogManager rlm; + + @BeforeEach + void setUp() throws IOException { + logDir = Files.createTempDirectory("kafka-test"); + LeaderEpochCheckpointFile checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1)); + cache = new LeaderEpochFileCache(topicPartition, checkpoint, time.scheduler); + rlm = new MockRemoteLogManager(2, 10, logDir.toString()); + } + + @AfterEach + void tearDown() throws IOException { + rlm.close(); + Utils.delete(logDir.toFile()); + } + + @Test + public void testReadRemoteLog() throws Exception { + AsyncOffsetReadFutureHolder>> asyncOffsetReadFutureHolder = + rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty); + asyncOffsetReadFutureHolder.taskFuture().get(1, TimeUnit.SECONDS); + assertTrue(asyncOffsetReadFutureHolder.taskFuture().isDone()); + + Either> result = asyncOffsetReadFutureHolder.taskFuture().get(); + assertFalse(result.isLeft()); + assertTrue(result.isRight()); + assertEquals(Option.apply(new TimestampAndOffset(100L, 90L, Optional.of(3))), + result.right().get()); + } + + @Test + public void testTaskQueueFullAndCancelTask() throws Exception { + rlm.pause(); + + List>>> holderList = new ArrayList<>(); + // Task queue size is 10 and number of threads is 2, so it can accept at-most 12 items + for (int i = 0; i < 12; i++) { + holderList.add(rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty)); + } + assertThrows(TimeoutException.class, () -> holderList.get(0).taskFuture().get(10, TimeUnit.MILLISECONDS)); + assertEquals(0, holderList.stream().filter(h -> h.taskFuture().isDone()).count()); + + assertThrows(RejectedExecutionException.class, () -> + holderList.add(rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty))); + + holderList.get(2).jobFuture().cancel(false); + + rlm.resume(); + AsyncOffsetReadFutureHolder>> last = holderList.get(holderList.size() - 1); + last.taskFuture().get(100, TimeUnit.MILLISECONDS); + + assertEquals(12, holderList.size()); + assertEquals(11, holderList.stream().filter(h -> h.taskFuture().isDone()).count()); + assertEquals(1, holderList.stream().filter(h -> !h.taskFuture().isDone()).count()); + } + + @Test + public void testThrowErrorOnFindOffsetByTimestamp() throws Exception { + RemoteStorageException exception = new RemoteStorageException("Error"); + try (RemoteLogManager rlm = new MockRemoteLogManager(2, 10, logDir.toString()) { + @Override + public Optional findOffsetByTimestamp(TopicPartition tp, + long timestamp, + long startingOffset, + LeaderEpochFileCache leaderEpochCache) throws RemoteStorageException { + throw exception; + } + }) { + AsyncOffsetReadFutureHolder>> futureHolder + = rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty); + futureHolder.taskFuture().get(1, TimeUnit.SECONDS); + + assertTrue(futureHolder.taskFuture().isDone()); + assertTrue(futureHolder.taskFuture().get().isLeft()); + assertEquals(exception, futureHolder.taskFuture().get().left().get()); + } + } + + private static class MockRemoteLogManager extends RemoteLogManager { + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + public MockRemoteLogManager(int threads, + int taskQueueSize, + String logDir) throws IOException { + super(rlmConfig(threads, taskQueueSize), + 1, + logDir, + "mock-cluster-id", + new MockTime(), + tp -> Optional.empty(), + (tp, logStartOffset) -> { }, + new BrokerTopicStats(true), + new Metrics() + ); + } + + @Override + public Optional findOffsetByTimestamp(TopicPartition tp, + long timestamp, + long startingOffset, + LeaderEpochFileCache leaderEpochCache) throws RemoteStorageException { + lock.readLock().lock(); + try { + return Optional.of(new TimestampAndOffset(100, 90, Optional.of(3))); + } finally { + lock.readLock().unlock(); + } + } + + void pause() { + lock.writeLock().lock(); + } + + void resume() { + lock.writeLock().unlock(); + } + } + + private static RemoteLogManagerConfig rlmConfig(int threads, int taskQueueSize) { + Properties props = new Properties(); + props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); + props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager"); + props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager"); + props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, threads); + props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP, taskQueueSize); + AbstractConfig config = new AbstractConfig(RemoteLogManagerConfig.configDef(), props, false); + return new RemoteLogManagerConfig(config); + } +} \ No newline at end of file diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java index 12040928b407e..dad92125b2442 100644 --- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java +++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java @@ -16,6 +16,7 @@ */ package kafka.server.share; +import kafka.log.OffsetResultHolder; import kafka.server.ReplicaManager; import org.apache.kafka.common.TopicIdPartition; @@ -219,7 +220,7 @@ public void testProcessFetchResponseWithLsoMovementForTopicPartition() { // Mock the replicaManager.fetchOffsetForTimestamp method to return a timestamp and offset for the topic partition. FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(100L, 1L, Optional.empty()); - doReturn(Option.apply(timestampAndOffset)).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean()); + doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean()); when(sp0.nextFetchOffset()).thenReturn((long) 0, (long) 5); when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 4); diff --git a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala new file mode 100644 index 0000000000000..d843990f31764 --- /dev/null +++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.log.AsyncOffsetReadFutureHolder +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.FileRecords.TimestampAndOffset +import org.apache.kafka.common.requests.ListOffsetsResponse +import org.apache.kafka.server.util.timer.MockTimer +import org.junit.jupiter.api.{AfterEach, Test} +import org.junit.jupiter.api.Assertions.assertEquals +import org.mockito.ArgumentMatchers.anyBoolean +import org.mockito.Mockito.{mock, when} + +import java.util.Optional +import java.util.concurrent.CompletableFuture +import scala.collection.mutable +import scala.concurrent.TimeoutException + +class DelayedRemoteListOffsetsTest { + + val delayMs = 10 + val timer = new MockTimer() + type T = Either[Exception, Option[TimestampAndOffset]] + val purgatory = + new DelayedOperationPurgatory[DelayedRemoteListOffsets]("test-purgatory", timer, purgeInterval = 10) + + @AfterEach + def afterEach(): Unit = { + purgatory.shutdown() + } + + @Test + def testResponseOnRequestExpiration(): Unit = { + var numResponse = 0 + val responseCallback = (response: List[ListOffsetsTopicResponse]) => { + response.foreach { topic => + topic.partitions().forEach { partition => + assertEquals(Errors.REQUEST_TIMED_OUT.code(), partition.errorCode()) + assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, partition.timestamp()) + assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, partition.offset()) + assertEquals(-1, partition.leaderEpoch()) + numResponse += 1 + } + } + } + + var cancelledCount = 0 + val jobFuture = mock(classOf[CompletableFuture[Void]]) + val holder: AsyncOffsetReadFutureHolder[T] = mock(classOf[AsyncOffsetReadFutureHolder[T]]) + when(holder.taskFuture).thenAnswer(_ => new CompletableFuture[T]()) + when(holder.jobFuture).thenReturn(jobFuture) + when(jobFuture.cancel(anyBoolean())).thenAnswer(_ => { + cancelledCount += 1 + true + }) + + val metadata = ListOffsetsMetadata(mutable.Map( + new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)), + new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)), + new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(holder)) + )) + + val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, metadata, responseCallback) + val listOffsetsRequestKeys = metadata.statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq + assertEquals(0, DelayedRemoteListOffsetsMetrics.aggregateExpirationMeter.count()) + assertEquals(0, DelayedRemoteListOffsetsMetrics.partitionExpirationMeters.size) + purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys) + + Thread.sleep(100) + assertEquals(3, listOffsetsRequestKeys.size) + assertEquals(listOffsetsRequestKeys.size, cancelledCount) + assertEquals(listOffsetsRequestKeys.size, numResponse) + assertEquals(listOffsetsRequestKeys.size, DelayedRemoteListOffsetsMetrics.aggregateExpirationMeter.count()) + listOffsetsRequestKeys.foreach(key => { + val tp = new TopicPartition(key.topic, key.partition) + assertEquals(1, DelayedRemoteListOffsetsMetrics.partitionExpirationMeters.get(tp).count()) + }) + } + + @Test + def testResponseOnSuccess(): Unit = { + var numResponse = 0 + val responseCallback = (response: List[ListOffsetsTopicResponse]) => { + response.foreach { topic => + topic.partitions().forEach { partition => + assertEquals(Errors.NONE.code(), partition.errorCode()) + assertEquals(100L, partition.timestamp()) + assertEquals(100L, partition.offset()) + assertEquals(50, partition.leaderEpoch()) + numResponse += 1 + } + } + } + + val timestampAndOffset = new TimestampAndOffset(100L, 100L, Optional.of(50)) + val taskFuture = new CompletableFuture[T]() + taskFuture.complete(Right(Some(timestampAndOffset))) + + var cancelledCount = 0 + val jobFuture = mock(classOf[CompletableFuture[Void]]) + val holder: AsyncOffsetReadFutureHolder[T] = mock(classOf[AsyncOffsetReadFutureHolder[T]]) + when(holder.taskFuture).thenAnswer(_ => taskFuture) + when(holder.jobFuture).thenReturn(jobFuture) + when(jobFuture.cancel(anyBoolean())).thenAnswer(_ => { + cancelledCount += 1 + true + }) + + val metadata = ListOffsetsMetadata(mutable.Map( + new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)), + new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)), + new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(holder)) + )) + + val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, metadata, responseCallback) + val listOffsetsRequestKeys = metadata.statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq + purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys) + + assertEquals(0, cancelledCount) + assertEquals(listOffsetsRequestKeys.size, numResponse) + } + + @Test + def testResponseOnPartialError(): Unit = { + var numResponse = 0 + val responseCallback = (response: List[ListOffsetsTopicResponse]) => { + response.foreach { topic => + topic.partitions().forEach { partition => + if (topic.name().equals("test1")) { + assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(), partition.errorCode()) + assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, partition.timestamp()) + assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, partition.offset()) + assertEquals(-1, partition.leaderEpoch()) + } else { + assertEquals(Errors.NONE.code(), partition.errorCode()) + assertEquals(100L, partition.timestamp()) + assertEquals(100L, partition.offset()) + assertEquals(50, partition.leaderEpoch()) + } + numResponse += 1 + } + } + } + + val timestampAndOffset = new TimestampAndOffset(100L, 100L, Optional.of(50)) + val taskFuture = new CompletableFuture[T]() + taskFuture.complete(Right(Some(timestampAndOffset))) + + var cancelledCount = 0 + val jobFuture = mock(classOf[CompletableFuture[Void]]) + val holder: AsyncOffsetReadFutureHolder[T] = mock(classOf[AsyncOffsetReadFutureHolder[T]]) + when(holder.taskFuture).thenAnswer(_ => taskFuture) + when(holder.jobFuture).thenReturn(jobFuture) + when(jobFuture.cancel(anyBoolean())).thenAnswer(_ => { + cancelledCount += 1 + true + }) + + val errorFutureHolder: AsyncOffsetReadFutureHolder[T] = mock(classOf[AsyncOffsetReadFutureHolder[T]]) + val errorTaskFuture = new CompletableFuture[T]() + errorTaskFuture.complete(Left(new TimeoutException("Timed out!"))) + when(errorFutureHolder.taskFuture).thenAnswer(_ => errorTaskFuture) + when(errorFutureHolder.jobFuture).thenReturn(jobFuture) + + val metadata = ListOffsetsMetadata(mutable.Map( + new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)), + new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)), + new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(errorFutureHolder)) + )) + + val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, metadata, responseCallback) + val listOffsetsRequestKeys = metadata.statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq + purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys) + + assertEquals(0, cancelledCount) + assertEquals(listOffsetsRequestKeys.size, numResponse) + } +} diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 3cc0e791ab7b7..c2ed559e730f6 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -749,7 +749,7 @@ class PartitionTest extends AbstractPartitionTest { val timestampAndOffsetOpt = partition.fetchOffsetForTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, isolationLevel = None, currentLeaderEpoch = Optional.empty(), - fetchOnlyFromLeader = true) + fetchOnlyFromLeader = true).timestampAndOffsetOpt assertTrue(timestampAndOffsetOpt.isDefined) @@ -803,12 +803,18 @@ class PartitionTest extends AbstractPartitionTest { def fetchOffsetsForTimestamp(timestamp: Long, isolation: Option[IsolationLevel]): Either[ApiException, Option[TimestampAndOffset]] = { try { - Right(partition.fetchOffsetForTimestamp( + val offsetResultHolder = partition.fetchOffsetForTimestamp( timestamp = timestamp, isolationLevel = isolation, currentLeaderEpoch = Optional.of(partition.getLeaderEpoch), fetchOnlyFromLeader = true - )) + ) + val timestampAndOffsetOpt = offsetResultHolder.timestampAndOffsetOpt + if (timestampAndOffsetOpt.isEmpty || offsetResultHolder.lastFetchableOffset.isDefined && + timestampAndOffsetOpt.get.offset >= offsetResultHolder.lastFetchableOffset.get) { + offsetResultHolder.maybeOffsetsError.map(e => throw e) + } + Right(timestampAndOffsetOpt) } catch { case e: ApiException => Left(e) } @@ -1013,7 +1019,7 @@ class PartitionTest extends AbstractPartitionTest { val res = partition.fetchOffsetForTimestamp(timestamp, isolationLevel = isolationLevel, currentLeaderEpoch = Optional.empty(), - fetchOnlyFromLeader = true) + fetchOnlyFromLeader = true).timestampAndOffsetOpt assertTrue(res.isDefined) res.get } diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index 8f34c4c06b798..d514239892204 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -174,7 +174,8 @@ object AbstractCoordinatorConcurrencyTest { val delayedFetchPurgatoryParam: DelayedOperationPurgatory[DelayedFetch], val delayedDeleteRecordsPurgatoryParam: DelayedOperationPurgatory[DelayedDeleteRecords], val delayedElectLeaderPurgatoryParam: DelayedOperationPurgatory[DelayedElectLeader], - val delayedRemoteFetchPurgatoryParam: DelayedOperationPurgatory[DelayedRemoteFetch]) + val delayedRemoteFetchPurgatoryParam: DelayedOperationPurgatory[DelayedRemoteFetch], + val delayedRemoteListOffsetsPurgatoryParam: DelayedOperationPurgatory[DelayedRemoteListOffsets]) extends ReplicaManager( config, metrics = null, @@ -191,6 +192,7 @@ object AbstractCoordinatorConcurrencyTest { delayedDeleteRecordsPurgatoryParam = Some(delayedDeleteRecordsPurgatoryParam), delayedElectLeaderPurgatoryParam = Some(delayedElectLeaderPurgatoryParam), delayedRemoteFetchPurgatoryParam = Some(delayedRemoteFetchPurgatoryParam), + delayedRemoteListOffsetsPurgatoryParam = Some(delayedRemoteListOffsetsPurgatoryParam), threadNamePrefix = Option(this.getClass.getName)) { @volatile var logs: mutable.Map[TopicPartition, (UnifiedLog, Long)] = _ @@ -285,6 +287,8 @@ object AbstractCoordinatorConcurrencyTest { watchKeys: mutable.Set[TopicPartitionOperationKey]): TestReplicaManager = { val mockRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch]( purgatoryName = "RemoteFetch", timer, reaperEnabled = false) + val mockRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets]( + purgatoryName = "RemoteListOffsets", timer, reaperEnabled = false) val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( purgatoryName = "Fetch", timer, reaperEnabled = false) val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords]( @@ -292,7 +296,8 @@ object AbstractCoordinatorConcurrencyTest { val mockElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader]( purgatoryName = "ElectLeader", timer, reaperEnabled = false) new TestReplicaManager(config, time, scheduler, logManager, quotaManagers, watchKeys, producePurgatory, - mockFetchPurgatory, mockDeleteRecordsPurgatory, mockElectLeaderPurgatory, mockRemoteFetchPurgatory) + mockFetchPurgatory, mockDeleteRecordsPurgatory, mockElectLeaderPurgatory, mockRemoteFetchPurgatory, + mockRemoteListOffsetsPurgatory) } } } diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 10b7ba4a507a1..32779f9c3bcc2 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -868,9 +868,11 @@ class LogLoaderTest { for (i <- 0 until numMessages) { assertEquals(i, LogTestUtils.readLog(log, i, 100).records.batches.iterator.next().lastOffset) if (i == 0) - assertEquals(log.logSegments.asScala.head.baseOffset, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset) + assertEquals(log.logSegments.asScala.head.baseOffset, + log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).timestampAndOffsetOpt.get.offset) else - assertEquals(i, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset) + assertEquals(i, + log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).timestampAndOffsetOpt.get.offset) } log.close() } @@ -942,9 +944,11 @@ class LogLoaderTest { for (i <- 0 until numMessages) { assertEquals(i, LogTestUtils.readLog(log, i, 100).records.batches.iterator.next().lastOffset) if (i == 0) - assertEquals(log.logSegments.asScala.head.baseOffset, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset) + assertEquals(log.logSegments.asScala.head.baseOffset, + log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).timestampAndOffsetOpt.get.offset) else - assertEquals(i, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset) + assertEquals(i, + log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).timestampAndOffsetOpt.get.offset) } log.close() } diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 1c617c20d36ac..3e90ede3be38c 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -19,7 +19,7 @@ package kafka.log import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} import kafka.log.remote.RemoteLogManager -import kafka.server.KafkaConfig +import kafka.server.{DelayedOperationPurgatory, DelayedRemoteListOffsets, KafkaConfig} import kafka.utils.TestUtils import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.config.TopicConfig @@ -27,6 +27,7 @@ import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.FetchResponseData +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record._ @@ -34,6 +35,7 @@ import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils} import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig +import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler} import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} @@ -47,12 +49,12 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{EnumSource, ValueSource} import org.mockito.ArgumentMatchers import org.mockito.ArgumentMatchers.{any, anyLong} -import org.mockito.Mockito.{doThrow, mock, spy, when} +import org.mockito.Mockito.{doAnswer, doThrow, spy} import java.io._ import java.nio.ByteBuffer import java.nio.file.Files -import java.util.concurrent.{Callable, ConcurrentHashMap, Executors} +import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, TimeUnit} import java.util.{Optional, OptionalLong, Properties} import scala.annotation.nowarn import scala.collection.mutable.ListBuffer @@ -2017,7 +2019,7 @@ class UnifiedLogTest { val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) val log = createLog(logDir, logConfig) - assertEquals(None, log.fetchOffsetByTimestamp(0L)) + assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(0L)) val firstTimestamp = mockTime.milliseconds val firstLeaderEpoch = 0 @@ -2033,23 +2035,23 @@ class UnifiedLogTest { timestamp = secondTimestamp), leaderEpoch = secondLeaderEpoch) - assertEquals(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch)))), log.fetchOffsetByTimestamp(firstTimestamp)) - assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch)))), log.fetchOffsetByTimestamp(secondTimestamp)) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))), log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP)) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))), log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch)))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)) // The cache can be updated directly after a leader change. // The new latest offset should reflect the updated epoch. log.maybeAssignEpochStartOffset(2, 2L) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2)))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)) } @@ -2058,7 +2060,7 @@ class UnifiedLogTest { val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) val log = createLog(logDir, logConfig) - assertEquals(None, log.fetchOffsetByTimestamp(0L)) + assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(0L)) val firstTimestamp = mockTime.milliseconds val leaderEpoch = 0 @@ -2078,19 +2080,30 @@ class UnifiedLogTest { timestamp = firstTimestamp), leaderEpoch = leaderEpoch) - assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(leaderEpoch))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(leaderEpoch)))), log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)) } @Test def testFetchOffsetByTimestampFromRemoteStorage(): Unit = { - val remoteLogManager = mock(classOf[RemoteLogManager]) + val config: KafkaConfig = createKafkaConfigWithRLM + val purgatory = DelayedOperationPurgatory[DelayedRemoteListOffsets]("RemoteListOffsets", config.brokerId) + val remoteLogManager = spy(new RemoteLogManager(config.remoteLogManagerConfig, + 0, + logDir.getAbsolutePath, + "clusterId", + mockTime, + _ => Optional.empty[UnifiedLog](), + (_, _) => {}, + brokerTopicStats, + new Metrics())) + remoteLogManager.setDelayedOperationPurgatory(purgatory) + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1, remoteLogStorageEnable = true) val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager)) - when(remoteLogManager.findOffsetByTimestamp(log.topicPartition, 0, 0, log.leaderEpochCache.get)) - .thenReturn(Optional.empty[TimestampAndOffset]()) - assertEquals(None, log.fetchOffsetByTimestamp(0L, Some(remoteLogManager))) + // Note that the log is empty, so remote offset read won't happen + assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(0L, Some(remoteLogManager))) val firstTimestamp = mockTime.milliseconds val firstLeaderEpoch = 0 @@ -2106,33 +2119,40 @@ class UnifiedLogTest { timestamp = secondTimestamp), leaderEpoch = secondLeaderEpoch) - when(remoteLogManager.findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition), - anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache.get))) - .thenAnswer(ans => { - val timestamp = ans.getArgument(1).asInstanceOf[Long] - Optional.of(timestamp) - .filter(_ == firstTimestamp) - .map[TimestampAndOffset](x => new TimestampAndOffset(x, 0L, Optional.of(firstLeaderEpoch))) - }) + doAnswer(ans => { + val timestamp = ans.getArgument(1).asInstanceOf[Long] + Optional.of(timestamp) + .filter(_ == firstTimestamp) + .map[TimestampAndOffset](x => new TimestampAndOffset(x, 0L, Optional.of(firstLeaderEpoch))) + }).when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition), + anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache.get)) log._localLogStartOffset = 1 - assertEquals(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), - log.fetchOffsetByTimestamp(firstTimestamp, Some(remoteLogManager))) - assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), - log.fetchOffsetByTimestamp(secondTimestamp, Some(remoteLogManager))) + def assertFetchOffsetByTimestamp(expected: Option[TimestampAndOffset], timestamp: Long): Unit = { + val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Some(remoteLogManager)) + assertTrue(offsetResultHolder.futureHolderOpt.isDefined) + offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS) + assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone) + assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.get().isRight) + assertEquals(expected, offsetResultHolder.futureHolderOpt.get.taskFuture.get().getOrElse(null)) + } + + // In the assertions below we test that offset 0 (first timestamp) is in remote and offset 1 (second timestamp) is in local storage. + assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), firstTimestamp) + assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), secondTimestamp) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))), log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager))) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch)))), log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch)))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) // The cache can be updated directly after a leader change. // The new latest offset should reflect the updated epoch. log.maybeAssignEpochStartOffset(2, 2L) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2)))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) } @@ -2141,7 +2161,7 @@ class UnifiedLogTest { val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) val log = createLog(logDir, logConfig) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) val firstTimestamp = mockTime.milliseconds @@ -2157,21 +2177,31 @@ class UnifiedLogTest { timestamp = secondTimestamp), leaderEpoch = leaderEpoch) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) } @Test def testFetchLatestTieredTimestampWithRemoteStorage(): Unit = { - val remoteLogManager = mock(classOf[RemoteLogManager]) + val config: KafkaConfig = createKafkaConfigWithRLM + val purgatory = DelayedOperationPurgatory[DelayedRemoteListOffsets]("RemoteListOffsets", config.brokerId) + val remoteLogManager = spy(new RemoteLogManager(config.remoteLogManagerConfig, + 0, + logDir.getAbsolutePath, + "clusterId", + mockTime, + _ => Optional.empty[UnifiedLog](), + (_, _) => {}, + brokerTopicStats, + new Metrics())) + remoteLogManager.setDelayedOperationPurgatory(purgatory) + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1, remoteLogStorageEnable = true) val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager)) - when(remoteLogManager.findOffsetByTimestamp(log.topicPartition, 0, 0, log.leaderEpochCache.get)) - .thenReturn(Optional.empty[TimestampAndOffset]()) - - assertEquals(None, log.fetchOffsetByTimestamp(0L, Some(remoteLogManager))) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, Optional.empty())), + // Note that the log is empty, so remote offset read won't happen + assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(0L, Some(remoteLogManager))) + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, Optional.empty()))), log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) val firstTimestamp = mockTime.milliseconds @@ -2188,40 +2218,57 @@ class UnifiedLogTest { timestamp = secondTimestamp), leaderEpoch = secondLeaderEpoch) - when(remoteLogManager.findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition), - anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache.get))) - .thenAnswer(ans => { - val timestamp = ans.getArgument(1).asInstanceOf[Long] - Optional.of(timestamp) - .filter(_ == firstTimestamp) - .map[TimestampAndOffset](x => new TimestampAndOffset(x, 0L, Optional.of(firstLeaderEpoch))) - }) + doAnswer(ans => { + val timestamp = ans.getArgument(1).asInstanceOf[Long] + Optional.of(timestamp) + .filter(_ == firstTimestamp) + .map[TimestampAndOffset](x => new TimestampAndOffset(x, 0L, Optional.of(firstLeaderEpoch))) + }).when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition), + anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache.get)) log._localLogStartOffset = 1 log._highestOffsetInRemoteStorage = 0 + def assertFetchOffsetByTimestamp(expected: Option[TimestampAndOffset], timestamp: Long): Unit = { + val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Some(remoteLogManager)) + assertTrue(offsetResultHolder.futureHolderOpt.isDefined) + offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS) + assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone) + assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.get().isRight) + assertEquals(expected, offsetResultHolder.futureHolderOpt.get.taskFuture.get().getOrElse(null)) + } + // In the assertions below we test that offset 0 (first timestamp) is in remote and offset 1 (second timestamp) is in local storage. - assertEquals(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), - log.fetchOffsetByTimestamp(firstTimestamp, Some(remoteLogManager))) - assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), - log.fetchOffsetByTimestamp(secondTimestamp, Some(remoteLogManager))) + assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), firstTimestamp) + assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), secondTimestamp) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))), log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager))) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP, Some(remoteLogManager))) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch)))), log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch)))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) // The cache can be updated directly after a leader change. // The new latest offset should reflect the updated epoch. log.maybeAssignEpochStartOffset(2, 2L) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))), + assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2)))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) } + private def createKafkaConfigWithRLM: KafkaConfig = { + val props = new Properties() + props.put("zookeeper.connect", "test") + props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") + props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName) + props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) + // set log reader threads number to 2 + props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "2") + KafkaConfig.fromProps(props) + } + /** * Test the Log truncate operations */ @@ -4154,6 +4201,7 @@ class UnifiedLogTest { val pid = 1L val epoch = 0.toShort + assertTrue(log.isEmpty) log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), producerId = pid, producerEpoch = epoch, sequence = 0), leaderEpoch = 0) log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), @@ -4173,12 +4221,14 @@ class UnifiedLogTest { log.deleteOldSegments() mockTime.sleep(1) assertEquals(2, log.logSegments.size) + assertFalse(log.isEmpty) // Update the log-start-offset from 0 to 3, then the base segment should not be eligible for deletion log.updateLogStartOffsetFromRemoteTier(3L) log.deleteOldSegments() mockTime.sleep(1) assertEquals(2, log.logSegments.size) + assertFalse(log.isEmpty) // Update the log-start-offset from 3 to 4, then the base segment should be eligible for deletion now even // if it is not uploaded to remote storage @@ -4186,6 +4236,13 @@ class UnifiedLogTest { log.deleteOldSegments() mockTime.sleep(1) assertEquals(1, log.logSegments.size) + assertFalse(log.isEmpty) + + log.updateLogStartOffsetFromRemoteTier(5L) + log.deleteOldSegments() + mockTime.sleep(1) + assertEquals(1, log.logSegments.size) + assertTrue(log.isEmpty) } @Test @@ -4401,6 +4458,14 @@ class UnifiedLogTest { seg2.close() } + @Test + def testFetchOffsetByTimestampShouldReadOnlyLocalLogWhenLogIsEmpty(): Unit = { + val logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable = true) + val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) + val result = log.fetchOffsetByTimestamp(mockTime.milliseconds(), Some(null)) + assertEquals(OffsetResultHolder(None, None), result) + } + private def appendTransactionalToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch: Short, diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 22a0140ea2be7..b978389743bd2 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -51,6 +51,7 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{Alte import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} +import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection} import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection} @@ -63,7 +64,6 @@ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ClientInformation, ListenerName} import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil} import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity} -import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata @@ -4114,13 +4114,25 @@ class KafkaApisTest extends Logging { val isolationLevel = IsolationLevel.READ_UNCOMMITTED val currentLeaderEpoch = Optional.of[Integer](15) - when(replicaManager.fetchOffsetForTimestamp( - ArgumentMatchers.eq(tp), - ArgumentMatchers.eq(ListOffsetsRequest.EARLIEST_TIMESTAMP), - ArgumentMatchers.eq(Some(isolationLevel)), - ArgumentMatchers.eq(currentLeaderEpoch), - fetchOnlyFromLeader = ArgumentMatchers.eq(true)) - ).thenThrow(error.exception) + when(replicaManager.fetchOffset( + ArgumentMatchers.any[Seq[ListOffsetsTopic]](), + ArgumentMatchers.eq(Set.empty[TopicPartition]), + ArgumentMatchers.eq(isolationLevel), + ArgumentMatchers.eq(ListOffsetsRequest.CONSUMER_REPLICA_ID), + ArgumentMatchers.eq[String](clientId), + ArgumentMatchers.anyInt(), // correlationId + ArgumentMatchers.anyShort(), // version + ArgumentMatchers.any[(Errors, ListOffsetsPartition) => ListOffsetsPartitionResponse](), + ArgumentMatchers.any[List[ListOffsetsTopicResponse] => Unit]() + )).thenAnswer(ans => { + val callback = ans.getArgument[List[ListOffsetsTopicResponse] => Unit](8) + val partitionResponse = new ListOffsetsPartitionResponse() + .setErrorCode(error.code()) + .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET) + .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP) + .setPartitionIndex(tp.partition()) + callback(List(new ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava))) + }) val targetTimes = List(new ListOffsetsTopic() .setName(tp.topic) @@ -10116,6 +10128,31 @@ class KafkaApisTest extends Logging { .setPartitionIndex(tp.partition) .setTimestamp(timestamp)).asJava)).asJava + when(replicaManager.fetchOffset( + ArgumentMatchers.any[Seq[ListOffsetsTopic]](), + ArgumentMatchers.eq(Set.empty[TopicPartition]), + ArgumentMatchers.eq(IsolationLevel.READ_UNCOMMITTED), + ArgumentMatchers.eq(ListOffsetsRequest.CONSUMER_REPLICA_ID), + ArgumentMatchers.eq[String](clientId), + ArgumentMatchers.anyInt(), // correlationId + ArgumentMatchers.anyShort(), // version + ArgumentMatchers.any[(Errors, ListOffsetsPartition) => ListOffsetsPartitionResponse](), + ArgumentMatchers.any[List[ListOffsetsTopicResponse] => Unit]() + )).thenAnswer(ans => { + val version = ans.getArgument[Short](6) + val callback = ans.getArgument[List[ListOffsetsTopicResponse] => Unit](8) + val errorCode = if (ReplicaManager.isListOffsetsTimestampUnsupported(timestamp, version)) + Errors.UNSUPPORTED_VERSION.code() + else + Errors.INVALID_REQUEST.code() + val partitionResponse = new ListOffsetsPartitionResponse() + .setErrorCode(errorCode) + .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET) + .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP) + .setPartitionIndex(tp.partition()) + callback(List(new ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava))) + }) + val data = new ListOffsetsRequestData().setTopics(targetTimes).setReplicaId(ListOffsetsRequest.CONSUMER_REPLICA_ID) val listOffsetRequest = ListOffsetsRequest.parse(MessageUtil.toByteBuffer(data, version), version) val request = buildRequest(listOffsetRequest) @@ -10135,21 +10172,33 @@ class KafkaApisTest extends Logging { private def testConsumerListOffsetLatest(isolationLevel: IsolationLevel): Unit = { val tp = new TopicPartition("foo", 0) val latestOffset = 15L - val currentLeaderEpoch = Optional.empty[Integer]() - - when(replicaManager.fetchOffsetForTimestamp( - ArgumentMatchers.eq(tp), - ArgumentMatchers.eq(ListOffsetsRequest.LATEST_TIMESTAMP), - ArgumentMatchers.eq(Some(isolationLevel)), - ArgumentMatchers.eq(currentLeaderEpoch), - fetchOnlyFromLeader = ArgumentMatchers.eq(true)) - ).thenReturn(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, latestOffset, currentLeaderEpoch))) val targetTimes = List(new ListOffsetsTopic() .setName(tp.topic) .setPartitions(List(new ListOffsetsPartition() .setPartitionIndex(tp.partition) .setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava + + when(replicaManager.fetchOffset( + ArgumentMatchers.eq(targetTimes.asScala.toSeq), + ArgumentMatchers.eq(Set.empty[TopicPartition]), + ArgumentMatchers.eq(isolationLevel), + ArgumentMatchers.eq(ListOffsetsRequest.CONSUMER_REPLICA_ID), + ArgumentMatchers.eq[String](clientId), + ArgumentMatchers.anyInt(), // correlationId + ArgumentMatchers.anyShort(), // version + ArgumentMatchers.any[(Errors, ListOffsetsPartition) => ListOffsetsPartitionResponse](), + ArgumentMatchers.any[List[ListOffsetsTopicResponse] => Unit]() + )).thenAnswer(ans => { + val callback = ans.getArgument[List[ListOffsetsTopicResponse] => Unit](8) + val partitionResponse = new ListOffsetsPartitionResponse() + .setErrorCode(Errors.NONE.code()) + .setOffset(latestOffset) + .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP) + .setPartitionIndex(tp.partition()) + callback(List(new ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava))) + }) + val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel) .setTargetTimes(targetTimes).build() val request = buildRequest(listOffsetRequest) diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 3bcdf2b2592ef..5753adde8b645 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -17,7 +17,7 @@ package kafka.server -import kafka.log.UnifiedLog +import kafka.log.{OffsetResultHolder, UnifiedLog} import kafka.utils.TestUtils import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse} @@ -106,13 +106,13 @@ class LogOffsetTest extends BaseRequestTest { log.updateHighWatermark(log.logEndOffset) - val firstOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) + val firstOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP).timestampAndOffsetOpt assertEquals(19L, firstOffset.get.offset) assertEquals(19L, firstOffset.get.timestamp) log.truncateTo(0) - assertEquals(Option.empty, log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)) + assertEquals(Option.empty, log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP).timestampAndOffsetOpt) } @ParameterizedTest @@ -128,7 +128,7 @@ class LogOffsetTest extends BaseRequestTest { log.updateHighWatermark(log.logEndOffset) - val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) + val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP).timestampAndOffsetOpt assertEquals(7L, log.logEndOffset) assertEquals(5L, maxTimestampOffset.get.offset) assertEquals(6L, maxTimestampOffset.get.timestamp) @@ -201,7 +201,7 @@ class LogOffsetTest extends BaseRequestTest { log.updateHighWatermark(log.logEndOffset) assertEquals(0L, log.logEndOffset) - assertEquals(Option.empty, log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)) + assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)) } @deprecated("legacyFetchOffsetsBefore", since = "") diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index d914d0f396394..aab64fa33c9df 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2999,6 +2999,8 @@ class ReplicaManagerTest { purgatoryName = "ElectLeader", timer, reaperEnabled = false) val mockRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch]( purgatoryName = "RemoteFetch", timer, reaperEnabled = false) + val mockRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets]( + purgatoryName = "RemoteListOffsets", timer, reaperEnabled = false) // Mock network client to show leader offset of 5 val blockingSend = new MockBlockingSender( @@ -3024,6 +3026,7 @@ class ReplicaManagerTest { delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory), delayedElectLeaderPurgatoryParam = Some(mockElectLeaderPurgatory), delayedRemoteFetchPurgatoryParam = Some(mockRemoteFetchPurgatory), + delayedRemoteListOffsetsPurgatoryParam = Some(mockRemoteListOffsetsPurgatory), threadNamePrefix = Option(this.getClass.getName)) { override protected def createReplicaFetcherManager(metrics: Metrics, @@ -3420,6 +3423,8 @@ class ReplicaManagerTest { purgatoryName = "DelayedElectLeader", timer, reaperEnabled = false) val mockDelayedRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch]( purgatoryName = "DelayedRemoteFetch", timer, reaperEnabled = false) + val mockDelayedRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets]( + purgatoryName = "RemoteListOffsets", timer, reaperEnabled = false) when(metadataCache.contains(new TopicPartition(topic, 0))).thenReturn(true) @@ -3452,6 +3457,7 @@ class ReplicaManagerTest { delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory), delayedElectLeaderPurgatoryParam = Some(mockDelayedElectLeaderPurgatory), delayedRemoteFetchPurgatoryParam = Some(mockDelayedRemoteFetchPurgatory), + delayedRemoteListOffsetsPurgatoryParam = Some(mockDelayedRemoteListOffsetsPurgatory), threadNamePrefix = Option(this.getClass.getName), addPartitionsToTxnManager = Some(addPartitionsToTxnManager), directoryEventHandler = directoryEventHandler, diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java index a24755de5f741..a93dea813d149 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java @@ -180,7 +180,7 @@ public TieredStorageTestBuilder expectSegmentToBeOffloaded(Integer fromBroker, TopicPartition topicPartition = new TopicPartition(topic, partition); List> records = new ArrayList<>(); for (KeyValueSpec kv: keyValues) { - records.add(new ProducerRecord<>(topic, partition, kv.getKey(), kv.getValue())); + records.add(new ProducerRecord<>(topic, partition, kv.getTimestamp(), kv.getKey(), kv.getValue())); } offloadables.computeIfAbsent(topicPartition, k -> new ArrayList<>()) .add(new OffloadableSpec(fromBroker, baseOffset, records)); diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java index a2d2f3d8e2e44..e0fd4fc6ec77f 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java @@ -44,6 +44,7 @@ import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory; import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; import org.apache.kafka.tiered.storage.specs.TopicSpec; import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; @@ -300,6 +301,15 @@ public LocalTieredStorage remoteStorageManager(int brokerId) { .orElseThrow(() -> new IllegalArgumentException("No remote storage manager found for broker " + brokerId)); } + // unused now, but it can be reused later as this is an utility method. + public Optional leaderEpochFileCache(int brokerId, TopicPartition partition) { + Optional unifiedLogOpt = log(brokerId, partition); + if (unifiedLogOpt.isPresent() && unifiedLogOpt.get().leaderEpochCache().isDefined()) { + return Optional.of(unifiedLogOpt.get().leaderEpochCache().get()); + } + return Optional.empty(); + } + public List remoteStorageManagers() { return remoteStorageManagers; } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectListOffsetsAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectListOffsetsAction.java index a4196517ed231..80f55ce0f0fdc 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectListOffsetsAction.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectListOffsetsAction.java @@ -28,6 +28,7 @@ import java.util.concurrent.ExecutionException; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; public final class ExpectListOffsetsAction implements TieredStorageTestAction { @@ -55,6 +56,8 @@ public void doExecute(TieredStorageTestContext context) throws InterruptedExcept if (expected.epoch != -1) { assertTrue(listOffsetsResult.leaderEpoch().isPresent()); assertEquals(expected.epoch, listOffsetsResult.leaderEpoch().get()); + } else { + assertFalse(listOffsetsResult.leaderEpoch().isPresent()); } } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ListOffsetsTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ListOffsetsTest.java new file mode 100644 index 0000000000000..af2cd13967af1 --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ListOffsetsTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.storage.internals.log.EpochEntry; +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; + +public class ListOffsetsTest extends TieredStorageTestHarness { + @Override + public int brokerCount() { + return 2; + } + + /** + * We are running this test only for the Kraft mode, since ZK mode is deprecated now. Note that: + * 1. In ZK mode, the leader-epoch gets bumped during reassignment (0 -> 1 -> 2) and leader-election (2 -> 3). + * 2. In Kraft mode, the leader-epoch gets bumped only for leader-election (0 -> 1) and not for reassignment. + * @param quorum The quorum to use for the test. + */ + @ParameterizedTest(name = "{displayName}.quorum={0}") + @ValueSource(strings = {"kraft"}) + public void executeTieredStorageTest(String quorum) { + super.executeTieredStorageTest(quorum); + } + + @Override + protected void writeTestSpecifications(TieredStorageTestBuilder builder) { + final int broker0 = 0; + final int broker1 = 1; + final String topicA = "topicA"; + final int p0 = 0; + final Time time = new MockTime(); + final long timestamp = time.milliseconds(); + final Map> assignment = mkMap(mkEntry(p0, Arrays.asList(broker0, broker1))); + + builder + .createTopic(topicA, 1, 2, 2, assignment, true) + // send records to partition 0 and expect the first segment to be offloaded + .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L) + .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, + new KeyValueSpec("k0", "v0", timestamp), + new KeyValueSpec("k1", "v1", timestamp + 1)) + .produceWithTimestamp(topicA, p0, + new KeyValueSpec("k0", "v0", timestamp), + new KeyValueSpec("k1", "v1", timestamp + 1), + new KeyValueSpec("k2", "v2", timestamp + 2)) + + // switch leader and send more records to partition 0 and expect the second segment to be offloaded. + .reassignReplica(topicA, p0, Arrays.asList(broker1, broker0)) + // After leader election, the partition's leader-epoch gets bumped from 0 -> 1 + .expectLeader(topicA, p0, broker1, true) + .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 4L) + + // NOTE that the (k2, v2) message was sent in the previous producer so we cannot expect that event in + // the segment to be offloaded. We can only expect the new messages. + .expectSegmentToBeOffloaded(broker1, topicA, p0, 2, + new KeyValueSpec("k3", "v3", timestamp + 3)) + .produceWithTimestamp(topicA, p0, + new KeyValueSpec("k3", "v3", timestamp + 3), + new KeyValueSpec("k4", "v4", timestamp + 4), + new KeyValueSpec("k5", "v5", timestamp + 5)) + + // LIST_OFFSETS requests can list the offsets from least-loaded (any) node. + // List offset for special timestamps + .expectListOffsets(topicA, p0, OffsetSpec.earliest(), new EpochEntry(0, 0)) + .expectListOffsets(topicA, p0, OffsetSpec.earliestLocal(), new EpochEntry(1, 4)) + .expectListOffsets(topicA, p0, OffsetSpec.latestTiered(), new EpochEntry(1, 3)) + .expectListOffsets(topicA, p0, OffsetSpec.latest(), new EpochEntry(1, 6)) + + // fetch offset using timestamp from the local disk + .expectListOffsets(topicA, p0, OffsetSpec.forTimestamp(timestamp + 6), new EpochEntry(NO_PARTITION_LEADER_EPOCH, -1)) + .expectListOffsets(topicA, p0, OffsetSpec.forTimestamp(timestamp + 5), new EpochEntry(1, 5)) + .expectListOffsets(topicA, p0, OffsetSpec.forTimestamp(timestamp + 4), new EpochEntry(1, 4)) + + // fetch offset using timestamp from the remote disk + .expectListOffsets(topicA, p0, OffsetSpec.forTimestamp(timestamp - 1), new EpochEntry(0, 0)) + .expectListOffsets(topicA, p0, OffsetSpec.forTimestamp(timestamp), new EpochEntry(0, 0)) + .expectListOffsets(topicA, p0, OffsetSpec.forTimestamp(timestamp + 1), new EpochEntry(0, 1)) + .expectListOffsets(topicA, p0, OffsetSpec.forTimestamp(timestamp + 3), new EpochEntry(1, 3)) + + // delete some records and check whether the earliest_offset gets updated. + .expectDeletionInRemoteStorage(broker1, topicA, p0, DELETE_SEGMENT, 1) + .deleteRecords(topicA, p0, 3L) + .expectListOffsets(topicA, p0, OffsetSpec.earliest(), new EpochEntry(1, 3)) + .expectListOffsets(topicA, p0, OffsetSpec.earliestLocal(), new EpochEntry(1, 4)) + + // delete all the records in remote layer and expect that earliest and earliest_local offsets are same + .expectDeletionInRemoteStorage(broker1, topicA, p0, DELETE_SEGMENT, 1) + .deleteRecords(topicA, p0, 4L) + .expectListOffsets(topicA, p0, OffsetSpec.earliest(), new EpochEntry(1, 4)) + .expectListOffsets(topicA, p0, OffsetSpec.earliestLocal(), new EpochEntry(1, 4)) + .expectListOffsets(topicA, p0, OffsetSpec.latestTiered(), new EpochEntry(NO_PARTITION_LEADER_EPOCH, 3)) + .expectListOffsets(topicA, p0, OffsetSpec.latest(), new EpochEntry(1, 6)); + } +}