Skip to content

Commit

Permalink
address jark's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
swuferhong committed Dec 30, 2024
1 parent 88105c4 commit a9bb612
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ public class MetricNames {
public static final String REPLICA_LEADER_COUNT = "leaderCount";
public static final String REPLICA_COUNT = "replicaCount";
public static final String WRITE_ID_COUNT = "writerIdCount";
public static final String DELAYED_WRITE_SIZE = "delayedWriteSize";
public static final String DELAYED_WRITE_EXPIRATION_RATE = "delayedWriteExpirationPerSecond";
public static final String DELAYED_FETCH_LOG_SIZE = "delayedFetchLogSize";
public static final String DELAYED_FETCH_LOG_FROM_FOLLOWER_EXPIRATION_RATE =
"delayedFetchLogFromFollowerExpirationPerSecond";
public static final String DELAYED_FETCH_LOG_FROM_CLIENT_EXPIRATION_RATE =
"delayedFetchLogFromClientExpirationPerSecond";
public static final String DELAYED_WRITE_COUNT = "delayedWriteCount";
public static final String DELAYED_WRITE_EXPIRES_RATE = "delayedWriteExpiresPerSecond";
public static final String DELAYED_FETCH_COUNT = "delayedFetchCount";
public static final String DELAYED_FETCH_FROM_FOLLOWER_EXPIRES_RATE =
"delayedFetchFromFollowerExpiresPerSecond";
public static final String DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE =
"delayedFetchFromClientExpiresPerSecond";

// --------------------------------------------------------------------------------------------
// metrics for table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public int getRelativePositionInSegment() {
return relativePositionInSegment;
}

/**
* Compute the number of bytes between this offset to the given offset, if they are on the same
* segment and this offset precedes the given offset.
*/
public int positionDiff(LogOffsetMetadata that) {
if (messageOffsetOnly()) {
throw new FlussRuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
int result = (int) (logStartOffset ^ (logStartOffset >>> 32));
result = 31 * result + (int) (localLogStartOffset ^ (localLogStartOffset >>> 32));
int result = Long.hashCode(logStartOffset);
result = 31 * result + Long.hashCode(localLogStartOffset);
result = 31 * result + logEndOffset.hashCode();
result = 31 * result + highWatermark.hashCode();
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public class TabletServerMetricGroup extends AbstractMetricGroup {
private final Counter replicationBytesIn;
private final Counter replicationBytesOut;
private final Counter delayedWriteExpireCount;
private final Counter delayedFetchLogFromFollowerExpireCount;
private final Counter delayedFetchLogFromClientExpireCount;
private final Counter delayedFetchFromFollowerExpireCount;
private final Counter delayedFetchFromClientExpireCount;

public TabletServerMetricGroup(
MetricRegistry registry, String clusterId, String hostname, int serverId) {
Expand All @@ -60,15 +60,15 @@ public TabletServerMetricGroup(
meter(MetricNames.REPLICATION_OUT_RATE, new MeterView(replicationBytesOut));

delayedWriteExpireCount = new ThreadSafeSimpleCounter();
meter(MetricNames.DELAYED_WRITE_EXPIRATION_RATE, new MeterView(delayedWriteExpireCount));
delayedFetchLogFromFollowerExpireCount = new ThreadSafeSimpleCounter();
meter(MetricNames.DELAYED_WRITE_EXPIRES_RATE, new MeterView(delayedWriteExpireCount));
delayedFetchFromFollowerExpireCount = new ThreadSafeSimpleCounter();
meter(
MetricNames.DELAYED_FETCH_LOG_FROM_FOLLOWER_EXPIRATION_RATE,
new MeterView(delayedFetchLogFromFollowerExpireCount));
delayedFetchLogFromClientExpireCount = new ThreadSafeSimpleCounter();
MetricNames.DELAYED_FETCH_FROM_FOLLOWER_EXPIRES_RATE,
new MeterView(delayedFetchFromFollowerExpireCount));
delayedFetchFromClientExpireCount = new ThreadSafeSimpleCounter();
meter(
MetricNames.DELAYED_FETCH_LOG_FROM_CLIENT_EXPIRATION_RATE,
new MeterView(delayedFetchLogFromClientExpireCount));
MetricNames.DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE,
new MeterView(delayedFetchFromClientExpireCount));
}

@Override
Expand All @@ -95,12 +95,12 @@ public Counter delayedWriteExpireCount() {
return delayedWriteExpireCount;
}

public Counter delayedFetchLogFromFollowerExpireCount() {
return delayedFetchLogFromFollowerExpireCount;
public Counter delayedFetchFromFollowerExpireCount() {
return delayedFetchFromFollowerExpireCount;
}

public Counter delayedFetchLogFromClientExpireCount() {
return delayedFetchLogFromClientExpireCount;
public Counter delayedFetchFromClientExpireCount() {
return delayedFetchFromClientExpireCount;
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,8 +793,14 @@ public LogAppendInfo appendRecordsToLeader(MemoryLogRecords memoryLogRecords, in
// TODO WRITE a leader epoch.
LogAppendInfo appendInfo = logTablet.appendAsLeader(memoryLogRecords);

// we may need to increment high watermark.
maybeIncrementLeaderHW(logTablet, System.currentTimeMillis());
// we may need to increment high watermark if isr could be down to 1 or the
// replica count is 1.
boolean hwIncreased =
maybeIncrementLeaderHW(logTablet, System.currentTimeMillis());

if (hwIncreased) {
tryCompleteDelayedOperations();
}

return appendInfo;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,9 @@ private void registerMetrics() {
() -> onlineReplicas().filter(Replica::isLeader).count());
serverMetricGroup.gauge(MetricNames.REPLICA_COUNT, allReplicas::size);
serverMetricGroup.gauge(MetricNames.WRITE_ID_COUNT, this::writerIdCount);
serverMetricGroup.gauge(MetricNames.DELAYED_WRITE_SIZE, delayedWriteManager::numDelayed);
serverMetricGroup.gauge(MetricNames.DELAYED_WRITE_COUNT, delayedWriteManager::numDelayed);
serverMetricGroup.gauge(
MetricNames.DELAYED_FETCH_LOG_SIZE, delayedFetchLogManager::numDelayed);
MetricNames.DELAYED_FETCH_COUNT, delayedFetchLogManager::numDelayed);
}

private Stream<Replica> onlineReplicas() {
Expand Down Expand Up @@ -392,15 +392,15 @@ public void fetchLogRecords(
Map<TableBucket, FetchData> bucketFetchInfo,
Consumer<Map<TableBucket, FetchLogResultForBucket>> responseCallback) {
long startTime = System.currentTimeMillis();
Map<TableBucket, LogReadStatus> logFetchResults = readFromLog(params, bucketFetchInfo);
Map<TableBucket, LogReadResult> logReadResults = readFromLog(params, bucketFetchInfo);
if (LOG.isTraceEnabled()) {
LOG.trace(
"Fetch log records from local log in {} ms",
System.currentTimeMillis() - startTime);
}

// maybe do delay fetch log operation.
maybeAddDelayedFetchLog(params, bucketFetchInfo, logFetchResults, responseCallback);
maybeAddDelayedFetchLog(params, bucketFetchInfo, logReadResults, responseCallback);
}

/**
Expand Down Expand Up @@ -749,7 +749,7 @@ private void makeFollowers(
.collect(Collectors.toSet()));

replicasBecomeFollower.forEach(
replica -> completeDelayedWriteAndFetchLogOperations(replica.getTableBucket()));
replica -> completeDelayedOperations(replica.getTableBucket()));

LOG.info(
"Stopped fetchers as part of become follower request for {} replicas",
Expand Down Expand Up @@ -918,9 +918,9 @@ public void limitScan(
responseCallback.accept(limitScanResultForBucket);
}

public Map<TableBucket, LogReadStatus> readFromLog(
public Map<TableBucket, LogReadResult> readFromLog(
FetchParams fetchParams, Map<TableBucket, FetchData> bucketFetchInfo) {
Map<TableBucket, LogReadStatus> logFetchResult = new HashMap<>();
Map<TableBucket, LogReadResult> logReadResult = new HashMap<>();
boolean isFromFollower = fetchParams.isFromFollower();
int limitBytes = fetchParams.maxFetchBytes();
for (Map.Entry<TableBucket, FetchData> entry : bucketFetchInfo.entrySet()) {
Expand Down Expand Up @@ -956,9 +956,9 @@ public Map<TableBucket, LogReadStatus> readFromLog(
}
limitBytes = Math.max(0, limitBytes - recordBatchSize);

logFetchResult.put(
logReadResult.put(
tb,
new LogReadStatus(
new LogReadResult(
new FetchLogResultForBucket(
tb, fetchedData.getRecords(), readInfo.getHighWatermark()),
fetchedData.getFetchOffsetMetadata()));
Expand Down Expand Up @@ -988,11 +988,11 @@ public Map<TableBucket, LogReadStatus> readFromLog(
} else {
result = new FetchLogResultForBucket(tb, ApiError.fromThrowable(e));
}
logFetchResult.put(
tb, new LogReadStatus(result, LogOffsetMetadata.UNKNOWN_OFFSET_METADATA));
logReadResult.put(
tb, new LogReadResult(result, LogOffsetMetadata.UNKNOWN_OFFSET_METADATA));
}
}
return logFetchResult;
return logReadResult;
}

private FetchLogResultForBucket handleFetchOutOfRangeException(
Expand Down Expand Up @@ -1158,17 +1158,17 @@ private <T extends WriteResultForBucket> void maybeAddDelayedWrite(
private void maybeAddDelayedFetchLog(
FetchParams params,
Map<TableBucket, FetchData> bucketFetchInfo,
Map<TableBucket, LogReadStatus> logFetchResults,
Map<TableBucket, LogReadResult> logReadResults,
Consumer<Map<TableBucket, FetchLogResultForBucket>> responseCallback) {
long bytesReadable = 0;
boolean errorReadingData = false;
boolean hasFetchFromLocal = false;
Map<TableBucket, FetchBucketStatus> fetchBucketStatusMap = new HashMap<>();
for (Map.Entry<TableBucket, LogReadStatus> logFetchResult : logFetchResults.entrySet()) {
TableBucket tb = logFetchResult.getKey();
LogReadStatus logReadStatus = logFetchResult.getValue();
for (Map.Entry<TableBucket, LogReadResult> logReadResultEntry : logReadResults.entrySet()) {
TableBucket tb = logReadResultEntry.getKey();
LogReadResult logReadResult = logReadResultEntry.getValue();
FetchLogResultForBucket fetchLogResultForBucket =
logReadStatus.getFetchLogResultForBucket();
logReadResult.getFetchLogResultForBucket();
if (fetchLogResultForBucket.failed()) {
errorReadingData = true;
break;
Expand All @@ -1183,7 +1183,7 @@ private void maybeAddDelayedFetchLog(
tb,
new FetchBucketStatus(
bucketFetchInfo.get(tb),
logReadStatus.getLogOffsetMetadata(),
logReadResult.getLogOffsetMetadata(),
fetchLogResultForBucket));
}

Expand All @@ -1193,7 +1193,7 @@ private void maybeAddDelayedFetchLog(
|| bytesReadable >= params.minFetchBytes()
|| errorReadingData) {
responseCallback.accept(
logFetchResults.entrySet().stream()
logReadResults.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
Expand Down Expand Up @@ -1221,7 +1221,7 @@ private void maybeAddDelayedFetchLog(
}
}

private void completeDelayedWriteAndFetchLogOperations(TableBucket tableBucket) {
private void completeDelayedOperations(TableBucket tableBucket) {
DelayedTableBucketKey delayedTableBucketKey = new DelayedTableBucketKey(tableBucket);
delayedWriteManager.checkAndComplete(delayedTableBucketKey);
delayedFetchLogManager.checkAndComplete(delayedTableBucketKey);
Expand Down Expand Up @@ -1337,7 +1337,7 @@ private StopReplicaResultForBucket stopReplica(

// If we were the leader, we may have some operations still waiting for completion.
// We force completion to prevent them from timing out.
completeDelayedWriteAndFetchLogOperations(tb);
completeDelayedOperations(tb);

return new StopReplicaResultForBucket(tb);
}
Expand Down Expand Up @@ -1527,12 +1527,12 @@ public void shutdown() throws InterruptedException {
checkpointHighWatermarks();
}

/** The status of reading log. */
public static final class LogReadStatus {
/** The result of reading log. */
public static final class LogReadResult {
private final FetchLogResultForBucket fetchLogResultForBucket;
private final LogOffsetMetadata logOffsetMetadata;

public LogReadStatus(
public LogReadResult(
FetchLogResultForBucket fetchLogResultForBucket,
LogOffsetMetadata logOffsetMetadata) {
this.fetchLogResultForBucket = fetchLogResultForBucket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import com.alibaba.fluss.server.metrics.group.TabletServerMetricGroup;
import com.alibaba.fluss.server.replica.Replica;
import com.alibaba.fluss.server.replica.ReplicaManager;
import com.alibaba.fluss.server.replica.ReplicaManager.LogReadStatus;
import com.alibaba.fluss.server.replica.ReplicaManager.LogReadResult;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -91,9 +91,9 @@ public void onComplete() {
}

// re-fetch data.
Map<TableBucket, LogReadStatus> reFetchResult =
Map<TableBucket, LogReadResult> reReadResult =
replicaManager.readFromLog(params, reFetchBuckets);
reFetchResult.forEach((key, value) -> result.put(key, value.getFetchLogResultForBucket()));
reReadResult.forEach((key, value) -> result.put(key, value.getFetchLogResultForBucket()));
responseCallback.accept(result);
}

Expand All @@ -103,7 +103,7 @@ public void onComplete() {
* <ul>
* <li>Case A: The server is no longer the leader for some buckets it tries to fetch
* <li>Case B: The replica is no longer available on this server
* <li>Case C: This server doesn't know of some buckets ot tries to fetch
* <li>Case C: This server doesn't know of some buckets it tries to fetch
* <li>Case D: The fetch offset locates not on the last segment of the log
* <li>Case E: The accumulated bytes from all the fetching buckets exceeds the minimum bytes
* </ul>
Expand Down Expand Up @@ -195,9 +195,9 @@ public boolean tryComplete() {
@Override
public void onExpiration() {
if (params.isFromFollower()) {
serverMetricGroup.delayedFetchLogFromFollowerExpireCount().inc();
serverMetricGroup.delayedFetchFromFollowerExpireCount().inc();
} else {
serverMetricGroup.delayedFetchLogFromClientExpireCount().inc();
serverMetricGroup.delayedFetchFromClientExpireCount().inc();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ void testFetchLog() throws Exception {
FetchLogResultForBucket resultForBucket = result.get(tb);
assertThat(resultForBucket.getTableBucket()).isEqualTo(tb);
assertThat(resultForBucket.getHighWatermark()).isEqualTo(0L);
assertThat(resultForBucket.records()).isNull();
assertThat(resultForBucket.records().sizeInBytes()).isEqualTo(0);

// produce one batch to this bucket.
CompletableFuture<List<ProduceLogResultForBucket>> future = new CompletableFuture<>();
Expand Down
Loading

0 comments on commit a9bb612

Please sign in to comment.