Skip to content

Commit

Permalink
address jark's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
swuferhong committed Jan 3, 2025
1 parent fbb4944 commit 2c4ef5d
Show file tree
Hide file tree
Showing 21 changed files with 226 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static com.alibaba.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
Expand All @@ -59,6 +60,8 @@
public class MetadataUpdater {
private static final Logger LOG = LoggerFactory.getLogger(MetadataUpdater.class);

private static final int MAX_RETRY_TIMES = 5;

private final RpcClient rpcClient;
protected volatile Cluster cluster;

Expand Down Expand Up @@ -119,8 +122,24 @@ public TableDescriptor getTableDescriptorOrElseThrow(long tableId) {
public int leaderFor(TableBucket tableBucket) {
ServerNode serverNode = cluster.leaderFor(tableBucket);
if (serverNode == null) {
throw new FlussRuntimeException("Leader not found for table bucket: " + tableBucket);
for (int i = 0; i < MAX_RETRY_TIMES; i++) {
TablePath tablePath = cluster.getTablePathOrElseThrow(tableBucket.getTableId());
updateMetadata(Collections.singleton(tablePath), null, null);
serverNode = cluster.leaderFor(tableBucket);
if (serverNode != null) {
break;
}
}

if (serverNode == null) {
throw new FlussRuntimeException(
"Leader not found after retry "
+ MAX_RETRY_TIMES
+ " times for table bucket: "
+ tableBucket);
}
}

return serverNode.id();
}

Expand Down Expand Up @@ -239,7 +258,7 @@ private void updateMetadata(
}
} catch (Exception e) {
Throwable t = ExceptionUtils.stripExecutionException(e);
if (t instanceof RetriableException) {
if (t instanceof RetriableException || t instanceof TimeoutException) {
LOG.warn("Failed to update metadata, but the exception is re-triable.", t);
} else {
throw new FlussRuntimeException("Failed to update metadata", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,16 @@ public LogFetcher(
this.projection = projection;
this.rpcClient = rpcClient;
this.logScannerStatus = logScannerStatus;
this.maxFetchBytes = (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES).getBytes();
this.maxFetchBytes =
(int) conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES).getBytes();
this.maxBucketFetchBytes =
(int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES_FOR_BUCKET).getBytes();
this.minFetchBytes = (int) conf.get(ConfigOptions.LOG_FETCH_MIN_BYTES).getBytes();
this.maxFetchWaitMs = (int) conf.get(ConfigOptions.LOG_FETCH_WAIT_MAX_TIME).toMillis();
(int)
conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET)
.getBytes();
this.minFetchBytes =
(int) conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MIN_BYTES).getBytes();
this.maxFetchWaitMs =
(int) conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_WAIT_MAX_TIME).toMillis();

this.isCheckCrcs = conf.getBoolean(ConfigOptions.CLIENT_SCANNER_LOG_CHECK_CRC);
this.logFetchBuffer = new LogFetchBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/** Utils for metadata for client. */
public class MetadataUtils {
Expand All @@ -60,7 +62,7 @@ public class MetadataUtils {
*/
public static Cluster sendMetadataRequestAndRebuildCluster(
AdminReadOnlyGateway gateway, Set<TablePath> tablePaths)
throws ExecutionException, InterruptedException {
throws ExecutionException, InterruptedException, TimeoutException {
return sendMetadataRequestAndRebuildCluster(gateway, false, null, tablePaths, null, null);
}

Expand All @@ -76,7 +78,7 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
@Nullable Set<TablePath> tablePaths,
@Nullable Collection<PhysicalTablePath> tablePartitionNames,
@Nullable Collection<Long> tablePartitionIds)
throws ExecutionException, InterruptedException {
throws ExecutionException, InterruptedException, TimeoutException {
AdminReadOnlyGateway gateway =
GatewayClientProxy.createGatewayProxy(
() -> getOneAvailableTabletServerNode(cluster),
Expand All @@ -94,7 +96,7 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
@Nullable Set<TablePath> tablePaths,
@Nullable Collection<PhysicalTablePath> tablePartitions,
@Nullable Collection<Long> tablePartitionIds)
throws ExecutionException, InterruptedException {
throws ExecutionException, InterruptedException, TimeoutException {
MetadataRequest metadataRequest =
ClientRpcMessageUtils.makeMetadataRequest(
tablePaths, tablePartitions, tablePartitionIds);
Expand Down Expand Up @@ -151,7 +153,9 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
newPartitionIdByPath,
newTablePathToTableInfo);
})
.get();
.get(5, TimeUnit.SECONDS); // TODO currently, we don't have timeout logic in
// RpcClient, it will let the get() block forever. So we
// time out here
}

private static NewTableMetadata getTableMetadataToUpdate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,35 +410,34 @@ public class ConfigOptions {
.defaultValue(Duration.ofSeconds(1))
.withDescription("The amount of time to sleep when fetch bucket error occurs.");

public static final ConfigOption<MemorySize> LOG_FETCH_MAX_BYTES =
key("log.fetch.max-bytes")
public static final ConfigOption<MemorySize> LOG_REPLICA_FETCH_MAX_BYTES =
key("log.replica.fetch-max-bytes")
.memoryType()
.defaultValue(MemorySize.parse("16mb"))
.withDescription(
"The maximum amount of data the server should return for a fetch request. "
+ "Records are fetched in batches for log scanner or follower, for one request batch, "
+ "and if the first record batch in the first non-empty bucket of the fetch is "
+ "larger than this value, the record batch will still be returned to ensure that "
+ "the fetch can make progress. As such, this is not a absolute maximum. Note that "
+ "the fetcher performs multiple fetches in parallel.");
"The maximum amount of data the server should return for a fetch request from follower. "
+ "Records are fetched in batches, and if the first record batch in the first "
+ "non-empty bucket of the fetch is larger than this value, the record batch "
+ "will still be returned to ensure that the fetch can make progress. As such, "
+ "this is not a absolute maximum. Note that the fetcher performs multiple fetches "
+ "in parallel.");

public static final ConfigOption<MemorySize> LOG_FETCH_MAX_BYTES_FOR_BUCKET =
key("log.fetch.max-bytes-for-bucket")
.memoryType()
.defaultValue(MemorySize.parse("1mb"))
.withDescription(
"The maximum amount of data the server should return for a table bucket in fetch request. "
+ "Records are fetched in batches for consumer or follower, for one request batch, "
+ "the max bytes size is config by this option.");
"The maximum amount of data the server should return for a table bucket in fetch request "
+ "from follower. Records are fetched in batches, the max bytes size is "
+ "config by this option.");

public static final ConfigOption<Duration> LOG_FETCH_WAIT_MAX_TIME =
key("log.fetch.wait-max-time")
.durationType()
.defaultValue(Duration.ofMillis(500))
.withDescription(
"The maximum time to wait for enough bytes to be available for a fetch log response "
+ "(including fetch log request from the follower or client). "
+ "This value should always be less than the "
"The maximum time to wait for enough bytes to be available for a fetch log request "
+ "from follower to response. This value should always be less than the "
+ "'log.replica.max-lag-time' at all times to prevent frequent shrinking of ISR for "
+ "low throughput tables");

Expand All @@ -447,8 +446,8 @@ public class ConfigOptions {
.memoryType()
.defaultValue(MemorySize.parse("1b"))
.withDescription(
"The minimum bytes expected for each fetch log response (including fetch "
+ "log request from the follower or client). If not enough bytes, wait up to "
"The minimum bytes expected for each fetch log request from follower to response. "
+ "If not enough bytes, wait up to "
+ LOG_FETCH_WAIT_MAX_TIME.key()
+ " time to return.");

Expand Down Expand Up @@ -743,6 +742,44 @@ public class ConfigOptions {
+ "The Scanner will cache the records from each fetch request and returns "
+ "them incrementally from each poll.");

public static final ConfigOption<MemorySize> CLIENT_SCANNER_LOG_FETCH_MAX_BYTES =
key("client.scanner.log.fetch-max-bytes")
.memoryType()
.defaultValue(MemorySize.parse("16mb"))
.withDescription(
"The maximum amount of data the server should return for a fetch request from client. "
+ "Records are fetched in batches, and if the first record batch in the first "
+ "non-empty bucket of the fetch is larger than this value, the record batch "
+ "will still be returned to ensure that the fetch can make progress. As such, "
+ "this is not a absolute maximum.");

public static final ConfigOption<MemorySize> CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET =
key("client.scanner.log.fetch-max-bytes-for-bucket")
.memoryType()
.defaultValue(MemorySize.parse("1mb"))
.withDescription(
"The maximum amount of data the server should return for a table bucket in fetch request "
+ "from client. Records are fetched in batches, the max bytes size is config by "
+ "this option.");

public static final ConfigOption<Duration> CLIENT_SCANNER_LOG_FETCH_WAIT_MAX_TIME =
key("client.scanner.log.fetch-wait-max-time")
.durationType()
.defaultValue(Duration.ofMillis(500))
.withDescription(
"The maximum time to wait for enough bytes to be available for a fetch log "
+ "request from client to response.");

public static final ConfigOption<MemorySize> CLIENT_SCANNER_LOG_FETCH_MIN_BYTES =
key("client.scanner.log.fetch-min-bytes")
.memoryType()
.defaultValue(MemorySize.parse("1b"))
.withDescription(
"The minimum bytes expected for each fetch log request from client to response. "
+ "If not enough bytes, wait up to "
+ CLIENT_SCANNER_LOG_FETCH_WAIT_MAX_TIME.key()
+ " time to return.");

public static final ConfigOption<Integer> CLIENT_LOOKUP_QUEUE_SIZE =
key("client.lookup.queue-size")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ public class MetricNames {
public static final String REPLICA_LEADER_COUNT = "leaderCount";
public static final String REPLICA_COUNT = "replicaCount";
public static final String WRITE_ID_COUNT = "writerIdCount";
public static final String DELAYED_WRITE_SIZE = "delayedWriteSize";
public static final String DELAYED_WRITE_EXPIRATION_RATE = "delayedWriteExpirationPerSecond";
public static final String DELAYED_FETCH_LOG_SIZE = "delayedFetchLogSize";
public static final String DELAYED_FETCH_LOG_FROM_FOLLOWER_EXPIRATION_RATE =
"delayedFetchLogFromFollowerExpirationPerSecond";
public static final String DELAYED_FETCH_LOG_FROM_CLIENT_EXPIRATION_RATE =
"delayedFetchLogFromClientExpirationPerSecond";
public static final String DELAYED_WRITE_COUNT = "delayedWriteCount";
public static final String DELAYED_WRITE_EXPIRES_RATE = "delayedWriteExpiresPerSecond";
public static final String DELAYED_FETCH_COUNT = "delayedFetchCount";
public static final String DELAYED_FETCH_FROM_FOLLOWER_EXPIRES_RATE =
"delayedFetchFromFollowerExpiresPerSecond";
public static final String DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE =
"delayedFetchFromClientExpiresPerSecond";

// --------------------------------------------------------------------------------------------
// metrics for table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public int getRelativePositionInSegment() {
return relativePositionInSegment;
}

/**
* Compute the number of bytes between this offset to the given offset, if they are on the same
* segment and this offset precedes the given offset.
*/
public int positionDiff(LogOffsetMetadata that) {
if (messageOffsetOnly()) {
throw new FlussRuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
int result = (int) (logStartOffset ^ (logStartOffset >>> 32));
result = 31 * result + (int) (localLogStartOffset ^ (localLogStartOffset >>> 32));
int result = Long.hashCode(logStartOffset);
result = 31 * result + Long.hashCode(localLogStartOffset);
result = 31 * result + logEndOffset.hashCode();
result = 31 * result + highWatermark.hashCode();
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public class TabletServerMetricGroup extends AbstractMetricGroup {
private final Counter replicationBytesIn;
private final Counter replicationBytesOut;
private final Counter delayedWriteExpireCount;
private final Counter delayedFetchLogFromFollowerExpireCount;
private final Counter delayedFetchLogFromClientExpireCount;
private final Counter delayedFetchFromFollowerExpireCount;
private final Counter delayedFetchFromClientExpireCount;

public TabletServerMetricGroup(
MetricRegistry registry, String clusterId, String hostname, int serverId) {
Expand All @@ -60,15 +60,15 @@ public TabletServerMetricGroup(
meter(MetricNames.REPLICATION_OUT_RATE, new MeterView(replicationBytesOut));

delayedWriteExpireCount = new ThreadSafeSimpleCounter();
meter(MetricNames.DELAYED_WRITE_EXPIRATION_RATE, new MeterView(delayedWriteExpireCount));
delayedFetchLogFromFollowerExpireCount = new ThreadSafeSimpleCounter();
meter(MetricNames.DELAYED_WRITE_EXPIRES_RATE, new MeterView(delayedWriteExpireCount));
delayedFetchFromFollowerExpireCount = new ThreadSafeSimpleCounter();
meter(
MetricNames.DELAYED_FETCH_LOG_FROM_FOLLOWER_EXPIRATION_RATE,
new MeterView(delayedFetchLogFromFollowerExpireCount));
delayedFetchLogFromClientExpireCount = new ThreadSafeSimpleCounter();
MetricNames.DELAYED_FETCH_FROM_FOLLOWER_EXPIRES_RATE,
new MeterView(delayedFetchFromFollowerExpireCount));
delayedFetchFromClientExpireCount = new ThreadSafeSimpleCounter();
meter(
MetricNames.DELAYED_FETCH_LOG_FROM_CLIENT_EXPIRATION_RATE,
new MeterView(delayedFetchLogFromClientExpireCount));
MetricNames.DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE,
new MeterView(delayedFetchFromClientExpireCount));
}

@Override
Expand All @@ -95,12 +95,12 @@ public Counter delayedWriteExpireCount() {
return delayedWriteExpireCount;
}

public Counter delayedFetchLogFromFollowerExpireCount() {
return delayedFetchLogFromFollowerExpireCount;
public Counter delayedFetchFromFollowerExpireCount() {
return delayedFetchFromFollowerExpireCount;
}

public Counter delayedFetchLogFromClientExpireCount() {
return delayedFetchLogFromClientExpireCount;
public Counter delayedFetchFromClientExpireCount() {
return delayedFetchFromClientExpireCount;
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,8 +793,14 @@ public LogAppendInfo appendRecordsToLeader(MemoryLogRecords memoryLogRecords, in
// TODO WRITE a leader epoch.
LogAppendInfo appendInfo = logTablet.appendAsLeader(memoryLogRecords);

// we may need to increment high watermark.
maybeIncrementLeaderHW(logTablet, System.currentTimeMillis());
// we may need to increment high watermark if isr could be down to 1 or the
// replica count is 1.
boolean hwIncreased =
maybeIncrementLeaderHW(logTablet, System.currentTimeMillis());

if (hwIncreased) {
tryCompleteDelayedOperations();
}

return appendInfo;
});
Expand Down
Loading

0 comments on commit 2c4ef5d

Please sign in to comment.