Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] FetchLogRequest support params: minFetchBytes and maxWaitMs to avoid frequently FetchLogRequest send to server #233

Merged
merged 3 commits into from
Jan 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static com.alibaba.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
Expand All @@ -59,6 +60,8 @@
public class MetadataUpdater {
private static final Logger LOG = LoggerFactory.getLogger(MetadataUpdater.class);

private static final int MAX_RETRY_TIMES = 5;

private final RpcClient rpcClient;
protected volatile Cluster cluster;

Expand Down Expand Up @@ -119,8 +122,24 @@ public TableDescriptor getTableDescriptorOrElseThrow(long tableId) {
public int leaderFor(TableBucket tableBucket) {
ServerNode serverNode = cluster.leaderFor(tableBucket);
if (serverNode == null) {
throw new FlussRuntimeException("Leader not found for table bucket: " + tableBucket);
for (int i = 0; i < MAX_RETRY_TIMES; i++) {
TablePath tablePath = cluster.getTablePathOrElseThrow(tableBucket.getTableId());
updateMetadata(Collections.singleton(tablePath), null, null);
serverNode = cluster.leaderFor(tableBucket);
if (serverNode != null) {
break;
}
}

if (serverNode == null) {
throw new FlussRuntimeException(
"Leader not found after retry "
+ MAX_RETRY_TIMES
+ " times for table bucket: "
+ tableBucket);
}
}

return serverNode.id();
}

Expand Down Expand Up @@ -239,7 +258,7 @@ private void updateMetadata(
}
} catch (Exception e) {
Throwable t = ExceptionUtils.stripExecutionException(e);
if (t instanceof RetriableException) {
if (t instanceof RetriableException || t instanceof TimeoutException) {
LOG.warn("Failed to update metadata, but the exception is re-triable.", t);
} else {
throw new FlussRuntimeException("Failed to update metadata", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public class LogFetcher implements Closeable {
private final RpcClient rpcClient;
private final int maxFetchBytes;
private final int maxBucketFetchBytes;
private final int minFetchBytes;
private final int maxFetchWaitMs;
private final boolean isCheckCrcs;
private final LogScannerStatus logScannerStatus;
private final LogFetchBuffer logFetchBuffer;
Expand Down Expand Up @@ -123,9 +125,17 @@ public LogFetcher(
this.projection = projection;
this.rpcClient = rpcClient;
this.logScannerStatus = logScannerStatus;
this.maxFetchBytes = (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES).getBytes();
this.maxFetchBytes =
(int) conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES).getBytes();
this.maxBucketFetchBytes =
(int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES_FOR_BUCKET).getBytes();
(int)
conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET)
.getBytes();
this.minFetchBytes =
(int) conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MIN_BYTES).getBytes();
this.maxFetchWaitMs =
(int) conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_WAIT_MAX_TIME).toMillis();

this.isCheckCrcs = conf.getBoolean(ConfigOptions.CLIENT_SCANNER_LOG_CHECK_CRC);
this.logFetchBuffer = new LogFetchBuffer();
this.nodesWithPendingFetchRequests = new HashSet<>();
Expand Down Expand Up @@ -419,7 +429,9 @@ private Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
FetchLogRequest fetchLogRequest =
new FetchLogRequest()
.setFollowerServerId(-1)
.setMaxBytes(maxFetchBytes);
.setMaxBytes(maxFetchBytes)
.setMinBytes(minFetchBytes)
.setMaxWaitMs(maxFetchWaitMs);
PbFetchLogReqForTable reqForTable =
new PbFetchLogReqForTable().setTableId(finalTableId);
if (readContext.isProjectionPushDowned()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/** Utils for metadata for client. */
public class MetadataUtils {
Expand All @@ -60,7 +62,7 @@ public class MetadataUtils {
*/
public static Cluster sendMetadataRequestAndRebuildCluster(
AdminReadOnlyGateway gateway, Set<TablePath> tablePaths)
throws ExecutionException, InterruptedException {
throws ExecutionException, InterruptedException, TimeoutException {
return sendMetadataRequestAndRebuildCluster(gateway, false, null, tablePaths, null, null);
}

Expand All @@ -76,7 +78,7 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
@Nullable Set<TablePath> tablePaths,
@Nullable Collection<PhysicalTablePath> tablePartitionNames,
@Nullable Collection<Long> tablePartitionIds)
throws ExecutionException, InterruptedException {
throws ExecutionException, InterruptedException, TimeoutException {
AdminReadOnlyGateway gateway =
GatewayClientProxy.createGatewayProxy(
() -> getOneAvailableTabletServerNode(cluster),
Expand All @@ -94,7 +96,7 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
@Nullable Set<TablePath> tablePaths,
@Nullable Collection<PhysicalTablePath> tablePartitions,
@Nullable Collection<Long> tablePartitionIds)
throws ExecutionException, InterruptedException {
throws ExecutionException, InterruptedException, TimeoutException {
MetadataRequest metadataRequest =
ClientRpcMessageUtils.makeMetadataRequest(
tablePaths, tablePartitions, tablePartitionIds);
Expand Down Expand Up @@ -151,7 +153,9 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
newPartitionIdByPath,
newTablePathToTableInfo);
})
.get();
.get(5, TimeUnit.SECONDS); // TODO currently, we don't have timeout logic in
// RpcClient, it will let the get() block forever. So we
// time out here
}

private static NewTableMetadata getTableMetadataToUpdate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,14 @@ public class ConfigOptions {
"The purge number (in number of requests) of the write operation manager, "
+ "the default value is 1000.");

public static final ConfigOption<Integer> LOG_REPLICA_FETCH_OPERATION_PURGE_NUMBER =
key("log.replica.fetch-operation-purge-number")
.intType()
.defaultValue(1000)
.withDescription(
"The purge number (in number of requests) of the fetch log operation manager, "
+ "the default value is 1000.");

public static final ConfigOption<Integer> LOG_REPLICA_FETCHER_NUMBER =
key("log.replica.fetcher-number")
.intType()
Expand All @@ -397,31 +405,54 @@ public class ConfigOptions {
+ "tablet server at the cost of higher CPU and memory utilization.");

public static final ConfigOption<Duration> LOG_REPLICA_FETCH_BACKOFF_INTERVAL =
key("log.replica.fetch-backoff-interval")
key("log.replica.fetch.backoff-interval")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription("The amount of time to sleep when fetch bucket error occurs.");
.withDescription("The amount of time to sleep when fetch bucket error occurs.")
.withFallbackKeys("log.replica.fetch-backoff-interval");

public static final ConfigOption<MemorySize> LOG_FETCH_MAX_BYTES =
key("log.fetch.max-bytes")
public static final ConfigOption<MemorySize> LOG_REPLICA_FETCH_MAX_BYTES =
key("log.replica.fetch.max-bytes")
.memoryType()
.defaultValue(MemorySize.parse("16mb"))
.withDescription(
"The maximum amount of data the server should return for a fetch request. "
+ "Records are fetched in batches for log scanner or follower, for one request batch, "
+ "and if the first record batch in the first non-empty bucket of the fetch is "
+ "larger than this value, the record batch will still be returned to ensure that "
+ "the fetch can make progress. As such, this is not a absolute maximum. Note that "
+ "the fetcher performs multiple fetches in parallel.");
"The maximum amount of data the server should return for a fetch request from follower. "
+ "Records are fetched in batches, and if the first record batch in the first "
+ "non-empty bucket of the fetch is larger than this value, the record batch "
+ "will still be returned to ensure that the fetch can make progress. As such, "
+ "this is not a absolute maximum. Note that the fetcher performs multiple fetches "
+ "in parallel.")
.withDeprecatedKeys("log.fetch.max-bytes");

public static final ConfigOption<MemorySize> LOG_FETCH_MAX_BYTES_FOR_BUCKET =
key("log.fetch.max-bytes-for-bucket")
public static final ConfigOption<MemorySize> LOG_REPLICA_FETCH_MAX_BYTES_FOR_BUCKET =
key("log.replica.fetch.max-bytes-for-bucket")
.memoryType()
.defaultValue(MemorySize.parse("1mb"))
.withDescription(
"The maximum amount of data the server should return for a table bucket in fetch request. "
+ "Records are fetched in batches for consumer or follower, for one request batch, "
+ "the max bytes size is config by this option.");
"The maximum amount of data the server should return for a table bucket in fetch request "
+ "from follower. Records are fetched in batches, the max bytes size is "
+ "config by this option.")
.withDeprecatedKeys("log.fetch.max-bytes-for-bucket");

public static final ConfigOption<Duration> LOG_REPLICA_FETCH_WAIT_MAX_TIME =
key("log.replica.fetch.wait-max-time")
.durationType()
.defaultValue(Duration.ofMillis(500))
.withDescription(
"The maximum time to wait for enough bytes to be available for a fetch log request "
+ "from follower to response. This value should always be less than the "
+ "'log.replica.max-lag-time' at all times to prevent frequent shrinking of ISR for "
+ "low throughput tables");

public static final ConfigOption<MemorySize> LOG_REPLICA_FETCH_MIN_BYTES =
key("log.replica.fetch.min-bytes")
.memoryType()
.defaultValue(MemorySize.parse("1b"))
.withDescription(
"The minimum bytes expected for each fetch log request from follower to response. "
+ "If not enough bytes, wait up to "
+ LOG_REPLICA_FETCH_WAIT_MAX_TIME.key()
+ " time to return.");

public static final ConfigOption<Integer> LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER =
key("log.replica.min-in-sync-replicas-number")
Expand Down Expand Up @@ -714,6 +745,44 @@ public class ConfigOptions {
+ "The Scanner will cache the records from each fetch request and returns "
+ "them incrementally from each poll.");

public static final ConfigOption<MemorySize> CLIENT_SCANNER_LOG_FETCH_MAX_BYTES =
key("client.scanner.log.fetch.max-bytes")
.memoryType()
.defaultValue(MemorySize.parse("16mb"))
.withDescription(
"The maximum amount of data the server should return for a fetch request from client. "
+ "Records are fetched in batches, and if the first record batch in the first "
+ "non-empty bucket of the fetch is larger than this value, the record batch "
+ "will still be returned to ensure that the fetch can make progress. As such, "
+ "this is not a absolute maximum.");

public static final ConfigOption<MemorySize> CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET =
key("client.scanner.log.fetch.max-bytes-for-bucket")
.memoryType()
.defaultValue(MemorySize.parse("1mb"))
.withDescription(
"The maximum amount of data the server should return for a table bucket in fetch request "
+ "from client. Records are fetched in batches, the max bytes size is config by "
+ "this option.");

public static final ConfigOption<Duration> CLIENT_SCANNER_LOG_FETCH_WAIT_MAX_TIME =
key("client.scanner.log.fetch.wait-max-time")
.durationType()
.defaultValue(Duration.ofMillis(500))
.withDescription(
"The maximum time to wait for enough bytes to be available for a fetch log "
+ "request from client to response.");

public static final ConfigOption<MemorySize> CLIENT_SCANNER_LOG_FETCH_MIN_BYTES =
key("client.scanner.log.fetch.min-bytes")
.memoryType()
.defaultValue(MemorySize.parse("1b"))
.withDescription(
"The minimum bytes expected for each fetch log request from client to response. "
+ "If not enough bytes, wait up to "
+ CLIENT_SCANNER_LOG_FETCH_WAIT_MAX_TIME.key()
+ " time to return.");

public static final ConfigOption<Integer> CLIENT_LOOKUP_QUEUE_SIZE =
key("client.lookup.queue-size")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ public class MetricNames {
public static final String REPLICA_LEADER_COUNT = "leaderCount";
public static final String REPLICA_COUNT = "replicaCount";
public static final String WRITE_ID_COUNT = "writerIdCount";
public static final String DELAYED_OPERATIONS_SIZE = "delayedOperationsSize";
public static final String DELAYED_WRITE_COUNT = "delayedWriteCount";
public static final String DELAYED_WRITE_EXPIRES_RATE = "delayedWriteExpiresPerSecond";
public static final String DELAYED_FETCH_COUNT = "delayedFetchCount";
public static final String DELAYED_FETCH_FROM_FOLLOWER_EXPIRES_RATE =
"delayedFetchFromFollowerExpiresPerSecond";
public static final String DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE =
"delayedFetchFromClientExpiresPerSecond";

// --------------------------------------------------------------------------------------------
// metrics for table
Expand Down
2 changes: 2 additions & 0 deletions fluss-rpc/src/main/proto/FlussApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ message FetchLogRequest {
required int32 follower_server_id = 1; // value -1 indicate the request from client.
required int32 max_bytes = 2;
repeated PbFetchLogReqForTable tables_req = 3;
optional int32 max_wait_ms = 4;
optional int32 min_bytes = 5;
}

message FetchLogResponse {
Expand Down
Loading