Skip to content

Commit

Permalink
KAFKA-15859 Make RemoteListOffsets call an async operation (#16602)
Browse files Browse the repository at this point in the history
This is the part-2 of the KIP-1075

To find the offset for a given timestamp, ListOffsets API is used by the client. When the topic is enabled with remote storage, then we have to fetch the remote indexes such as offset-index and time-index to serve the query. Also, the ListOffsets request can contain the query for multiple topics/partitions.

The time taken to read the indexes from remote storage is non-deterministic and the query is handled by the request-handler threads. If there are multiple LIST_OFFSETS queries and most of the request-handler threads are busy in reading the data from remote storage, then the other high-priority requests such as FETCH and PRODUCE might starve and be queued. This can lead to higher latency in producing/consuming messages.

In this patch, we have introduced a delayed operation for remote list-offsets call. If the timestamp need to be searched in the remote-storage, then the request-handler threads will pass-on the request to the remote-log-reader threads. And, the request gets handled in asynchronous fashion.

Covered the patch with unit and integration tests.

Reviewers: Satish Duggana <[email protected]>, Luke Chen <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
kamalcph authored Sep 15, 2024
1 parent e1f11c6 commit 344d8a6
Show file tree
Hide file tree
Showing 24 changed files with 1,271 additions and 202 deletions.
28 changes: 28 additions & 0 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@

import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.AsyncOffsetReadFutureHolder;
import kafka.log.UnifiedLog;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.DelayedRemoteListOffsets;
import kafka.server.StopPartition;
import kafka.server.TopicPartitionOperationKey;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
Expand Down Expand Up @@ -132,11 +136,13 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import scala.Option;
import scala.collection.JavaConverters;
import scala.util.Either;

import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
Expand Down Expand Up @@ -200,6 +206,7 @@ public class RemoteLogManager implements Closeable {

private volatile boolean remoteLogManagerConfigured = false;
private final Timer remoteReadTimer;
private DelayedOperationPurgatory<DelayedRemoteListOffsets> delayedRemoteListOffsetsPurgatory;

/**
* Creates RemoteLogManager instance with the given arguments.
Expand Down Expand Up @@ -263,6 +270,10 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
);
}

public void setDelayedOperationPurgatory(DelayedOperationPurgatory<DelayedRemoteListOffsets> delayedRemoteListOffsetsPurgatory) {
this.delayedRemoteListOffsetsPurgatory = delayedRemoteListOffsetsPurgatory;
}

public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
indexCache.resizeCacheSize(remoteLogIndexFileCacheSize);
}
Expand Down Expand Up @@ -620,6 +631,23 @@ private Optional<Integer> maybeLeaderEpoch(int leaderEpoch) {
return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ? Optional.empty() : Optional.of(leaderEpoch);
}

public AsyncOffsetReadFutureHolder<Either<Exception, Option<FileRecords.TimestampAndOffset>>> asyncOffsetRead(
TopicPartition topicPartition,
Long timestamp,
Long startingOffset,
LeaderEpochFileCache leaderEpochCache,
Supplier<Option<FileRecords.TimestampAndOffset>> searchLocalLog) {
CompletableFuture<Either<Exception, Option<FileRecords.TimestampAndOffset>>> taskFuture = new CompletableFuture<>();
Future<Void> jobFuture = remoteStorageReaderThreadPool.submit(
new RemoteLogOffsetReader(this, topicPartition, timestamp, startingOffset, leaderEpochCache, searchLocalLog, result -> {
TopicPartitionOperationKey key = new TopicPartitionOperationKey(topicPartition.topic(), topicPartition.partition());
taskFuture.complete(result);
delayedRemoteListOffsetsPurgatory.checkAndComplete(key);
})
);
return new AsyncOffsetReadFutureHolder<>(jobFuture, taskFuture);
}

/**
* Search the message offset in the remote storage based on timestamp and offset.
* <p>
Expand Down
79 changes: 79 additions & 0 deletions core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log.remote;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Supplier;

import scala.Option;
import scala.compat.java8.OptionConverters;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;

public class RemoteLogOffsetReader implements Callable<Void> {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogOffsetReader.class);
private final RemoteLogManager rlm;
private final TopicPartition tp;
private final long timestamp;
private final long startingOffset;
private final LeaderEpochFileCache leaderEpochCache;
private final Supplier<Option<FileRecords.TimestampAndOffset>> searchInLocalLog;
private final Consumer<Either<Exception, Option<FileRecords.TimestampAndOffset>>> callback;

public RemoteLogOffsetReader(RemoteLogManager rlm,
TopicPartition tp,
long timestamp,
long startingOffset,
LeaderEpochFileCache leaderEpochCache,
Supplier<Option<FileRecords.TimestampAndOffset>> searchInLocalLog,
Consumer<Either<Exception, Option<FileRecords.TimestampAndOffset>>> callback) {
this.rlm = rlm;
this.tp = tp;
this.timestamp = timestamp;
this.startingOffset = startingOffset;
this.leaderEpochCache = leaderEpochCache;
this.searchInLocalLog = searchInLocalLog;
this.callback = callback;
}

@Override
public Void call() throws Exception {
Either<Exception, Option<FileRecords.TimestampAndOffset>> result;
try {
// If it is not found in remote storage, then search in the local storage starting with local log start offset.
Option<FileRecords.TimestampAndOffset> timestampAndOffsetOpt =
OptionConverters.toScala(rlm.findOffsetByTimestamp(tp, timestamp, startingOffset, leaderEpochCache))
.orElse(searchInLocalLog::get);
result = Right.apply(timestampAndOffsetOpt);
} catch (Exception e) {
// NOTE: All the exceptions from the secondary storage are catched instead of only the KafkaException.
LOGGER.error("Error occurred while reading the remote log offset for {}", tp, e);
result = Left.apply(e);
}
callback.accept(result);
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import kafka.server.DelayedOperationPurgatory;
import kafka.server.DelayedProduce;
import kafka.server.DelayedRemoteFetch;
import kafka.server.DelayedRemoteListOffsets;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory.QuotaManagers;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class ReplicaManagerBuilder {
private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>> delayedDeleteRecordsPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedElectLeader>> delayedElectLeaderPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedRemoteFetch>> delayedRemoteFetchPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedRemoteListOffsets>> delayedRemoteListOffsetsPurgatory = Optional.empty();
private Optional<String> threadNamePrefix = Optional.empty();
private Long brokerEpoch = -1L;
private Optional<AddPartitionsToTxnManager> addPartitionsToTxnManager = Optional.empty();
Expand Down Expand Up @@ -210,6 +212,7 @@ public ReplicaManager build() {
OptionConverters.toScala(delayedDeleteRecordsPurgatory),
OptionConverters.toScala(delayedElectLeaderPurgatory),
OptionConverters.toScala(delayedRemoteFetchPurgatory),
OptionConverters.toScala(delayedRemoteListOffsetsPurgatory),
OptionConverters.toScala(threadNamePrefix),
() -> brokerEpoch,
OptionConverters.toScala(addPartitionsToTxnManager),
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/kafka/server/share/ShareFetchUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ static long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, Replic
// Isolation level is only required when reading from the latest offset hence use Option.empty() for now.
Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(), ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(),
Optional.empty(), true);
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset;
}
}
16 changes: 10 additions & 6 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1576,7 +1576,7 @@ class Partition(val topicPartition: TopicPartition,
isolationLevel: Option[IsolationLevel],
currentLeaderEpoch: Optional[Integer],
fetchOnlyFromLeader: Boolean,
remoteLogManager: Option[RemoteLogManager] = None): Option[TimestampAndOffset] = inReadLock(leaderIsrUpdateLock) {
remoteLogManager: Option[RemoteLogManager] = None): OffsetResultHolder = inReadLock(leaderIsrUpdateLock) {
// decide whether to only fetch from leader
val localLog = localLogWithEpochOrThrow(currentLeaderEpoch, fetchOnlyFromLeader)

Expand All @@ -1601,21 +1601,25 @@ class Partition(val topicPartition: TopicPartition,
s"high watermark (${localLog.highWatermark}) is lagging behind the " +
s"start offset from the beginning of this epoch ($epochStart)."))

def getOffsetByTimestamp: Option[TimestampAndOffset] = {
logManager.getLog(topicPartition).flatMap(log => log.fetchOffsetByTimestamp(timestamp, remoteLogManager))
def getOffsetByTimestamp: OffsetResultHolder = {
logManager.getLog(topicPartition)
.map(log => log.fetchOffsetByTimestamp(timestamp, remoteLogManager))
.getOrElse(OffsetResultHolder(timestampAndOffsetOpt = None))
}

// If we're in the lagging HW state after a leader election, throw OffsetNotAvailable for "latest" offset
// or for a timestamp lookup that is beyond the last fetchable offset.
timestamp match {
case ListOffsetsRequest.LATEST_TIMESTAMP =>
maybeOffsetsError.map(e => throw e)
.orElse(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset, Optional.of(leaderEpoch))))
.getOrElse(OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset, Optional.of(leaderEpoch)))))
case ListOffsetsRequest.EARLIEST_TIMESTAMP | ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP =>
getOffsetByTimestamp
case _ =>
getOffsetByTimestamp.filter(timestampAndOffset => timestampAndOffset.offset < lastFetchableOffset)
.orElse(maybeOffsetsError.map(e => throw e))
val offsetResultHolder = getOffsetByTimestamp
offsetResultHolder.maybeOffsetsError = maybeOffsetsError
offsetResultHolder.lastFetchableOffset = Some(lastFetchableOffset)
offsetResultHolder
}
}

Expand Down
38 changes: 38 additions & 0 deletions core/src/main/scala/kafka/log/OffsetResultHolder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log

import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset

import java.util.concurrent.{CompletableFuture, Future}

case class OffsetResultHolder(timestampAndOffsetOpt: Option[TimestampAndOffset],
futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]] = None) {

var maybeOffsetsError: Option[ApiException] = None
var lastFetchableOffset: Option[Long] = None
}

/**
* A remote log offset read task future holder. It contains two futures:
* 1. JobFuture - Use this future to cancel the running job.
* 2. TaskFuture - Use this future to get the result of the job/computation.
*/
case class AsyncOffsetReadFutureHolder[T](jobFuture: Future[Void], taskFuture: CompletableFuture[T]) {

}
37 changes: 21 additions & 16 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* None if no such message is found.
*/
@nowarn("cat=deprecation")
def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): Option[TimestampAndOffset] = {
def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): OffsetResultHolder = {
maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") {
debug(s"Searching offset for timestamp $targetTimestamp")

Expand All @@ -1285,7 +1285,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
Optional.of[Integer](earliestEpochEntry.get().epoch)
} else Optional.empty[Integer]()

Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt))
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt)))
} else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
val curLocalLogStartOffset = localLogStartOffset()

Expand All @@ -1297,15 +1297,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
Optional.empty()
}

Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult))
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult)))
} else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
val epoch = leaderEpochCache match {
case Some(cache) =>
val latestEpoch = cache.latestEpoch()
if (latestEpoch.isPresent) Optional.of[Integer](latestEpoch.getAsInt) else Optional.empty[Integer]()
case None => Optional.empty[Integer]()
}
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epoch))
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epoch)))
} else if (targetTimestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) {
if (remoteLogEnabled()) {
val curHighestRemoteOffset = highestOffsetInRemoteStorage()
Expand All @@ -1324,9 +1324,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
Optional.empty()
}

Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curHighestRemoteOffset, epochResult))
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curHighestRemoteOffset, epochResult)))
} else {
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1)))
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1))))
}
} else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
Expand All @@ -1336,34 +1336,39 @@ class UnifiedLog(@volatile var logStartOffset: Long,
val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar
// lookup the position of batch to avoid extra I/O
val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
latestTimestampSegment.log.batchesFrom(position.position).asScala
val timestampAndOffsetOpt = latestTimestampSegment.log.batchesFrom(position.position).asScala
.find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
.flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _,
Optional.of[Integer](batch.partitionLeaderEpoch()).filter(_ >= 0))))
OffsetResultHolder(timestampAndOffsetOpt)
} else {
// We need to search the first segment whose largest timestamp is >= the target timestamp if there is one.
if (remoteLogEnabled()) {
if (remoteLogEnabled() && !isEmpty) {
if (remoteLogManager.isEmpty) {
throw new KafkaException("RemoteLogManager is empty even though the remote log storage is enabled.")
}
if (recordVersion.value < RecordVersion.V2.value) {
throw new KafkaException("Tiered storage is supported only with versions supporting leader epochs, that means RecordVersion must be >= 2.")
}

val remoteOffset = remoteLogManager.get.findOffsetByTimestamp(topicPartition, targetTimestamp, logStartOffset, leaderEpochCache.get)
if (remoteOffset.isPresent) {
remoteOffset.asScala
} else {
// If it is not found in remote log storage, search in the local log storage from local log start offset.
searchOffsetInLocalLog(targetTimestamp, localLogStartOffset())
}
val asyncOffsetReadFutureHolder = remoteLogManager.get.asyncOffsetRead(topicPartition, targetTimestamp,
logStartOffset, leaderEpochCache.get, () => searchOffsetInLocalLog(targetTimestamp, localLogStartOffset()))
OffsetResultHolder(None, Some(asyncOffsetReadFutureHolder))
} else {
searchOffsetInLocalLog(targetTimestamp, logStartOffset)
OffsetResultHolder(searchOffsetInLocalLog(targetTimestamp, logStartOffset))
}
}
}
}

/**
* Checks if the log is empty.
* @return Returns True when the log is empty. Otherwise, false.
*/
private[log] def isEmpty = {
logStartOffset == logEndOffset
}

private def searchOffsetInLocalLog(targetTimestamp: Long, startOffset: Long): Option[TimestampAndOffset] = {
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
// constant time access while being safe to use with concurrent collections unlike `toArray`.
Expand Down
Loading

0 comments on commit 344d8a6

Please sign in to comment.