Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class RemoteStorageMetrics {
private static final String REMOTE_COPY_LAG_SEGMENTS = "RemoteCopyLagSegments";
private static final String REMOTE_DELETE_LAG_BYTES = "RemoteDeleteLagBytes";
private static final String REMOTE_DELETE_LAG_SEGMENTS = "RemoteDeleteLagSegments";
private static final String RETENTION_SIZE_IN_PERCENT = "RetentionSizeInPercent";
private static final String LOCAL_RETENTION_SIZE_IN_PERCENT = "LocalRetentionSizeInPercent";
private static final String REMOTE_LOG_READER_TASK_QUEUE_SIZE = REMOTE_LOG_READER_METRICS_NAME_PREFIX + TASK_QUEUE_SIZE;
private static final String REMOTE_LOG_READER_AVG_IDLE_PERCENT = REMOTE_LOG_READER_METRICS_NAME_PREFIX + AVG_IDLE_PERCENT;
private static final String REMOTE_LOG_READER_FETCH_RATE_AND_TIME_MS = REMOTE_LOG_READER_METRICS_NAME_PREFIX + "FetchRateAndTimeMs";
Expand Down Expand Up @@ -103,6 +105,11 @@ public class RemoteStorageMetrics {
public static final MetricName REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC = getMetricName(
"kafka.log.remote", "RemoteLogManager", REMOTE_LOG_READER_FETCH_RATE_AND_TIME_MS);

public static final MetricName RETENTION_SIZE_IN_PERCENT_METRIC = getMetricName(
"kafka.log.remote", "RemoteLogManager", RETENTION_SIZE_IN_PERCENT);
public static final MetricName LOCAL_RETENTION_SIZE_IN_PERCENT_METRIC = getMetricName(
"kafka.log.remote", "RemoteLogManager", LOCAL_RETENTION_SIZE_IN_PERCENT);

public static Set<MetricName> allMetrics() {
Set<MetricName> metrics = new HashSet<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand All @@ -133,8 +134,10 @@
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.LOCAL_RETENTION_SIZE_IN_PERCENT_METRIC;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.RETENTION_SIZE_IN_PERCENT_METRIC;

/**
* This class is responsible for
Expand Down Expand Up @@ -1138,20 +1141,54 @@ private Path toPathIfExists(File file) {
class RLMExpirationTask extends RLMTask {
private final Logger logger;
private volatile boolean isAllSegmentsValid = false;
private volatile boolean metricsRegistered = false;
private final Map<String, String> metricTags = new HashMap<>();
private final AtomicInteger retentionSizeInPercentValue = new AtomicInteger(0);
private final AtomicInteger localRetentionSizeInPercentValue = new AtomicInteger(0);

int retentionSizeInPercent() {
return retentionSizeInPercentValue.get();
}

int localRetentionSizeInPercent() {
return localRetentionSizeInPercentValue.get();
}

public RLMExpirationTask(TopicIdPartition topicIdPartition) {
super(topicIdPartition);
this.logger = getLogContext().logger(RLMExpirationTask.class);
metricTags.put("topic", topicIdPartition.topic());
metricTags.put("partition", Integer.toString(topicIdPartition.partition()));
}

private void registerMetrics() {
if (!metricsRegistered && !isCancelled()) {
metricsGroup.newGauge(RETENTION_SIZE_IN_PERCENT_METRIC.getName(), retentionSizeInPercentValue::get, metricTags);
metricsGroup.newGauge(LOCAL_RETENTION_SIZE_IN_PERCENT_METRIC.getName(), localRetentionSizeInPercentValue::get, metricTags);
metricsRegistered = true;
}
}

@Override
protected void execute(UnifiedLog log) throws InterruptedException, RemoteStorageException, ExecutionException {
// Register metrics on first execution (after task is safely scheduled)
registerMetrics();
cleanupExpiredRemoteLogSegments();
}

@Override
public void cancel() {
isAllSegmentsValid = false;
// Reset metrics to 0 immediately when task is cancelled to prevent stale values
retentionSizeInPercentValue.set(0);
localRetentionSizeInPercentValue.set(0);

// Remove metrics if they were registered
if (metricsRegistered) {
metricsGroup.removeMetric(RETENTION_SIZE_IN_PERCENT_METRIC.getName(), metricTags);
metricsGroup.removeMetric(LOCAL_RETENTION_SIZE_IN_PERCENT_METRIC.getName(), metricTags);
metricsRegistered = false;
}
super.cancel();
}

Expand Down Expand Up @@ -1332,6 +1369,7 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE
if (stats.metadataCount == 0) {
updateMetadataCountAndLogSizeWith(0, 0);
logger.debug("No remote log segments available on remote storage for partition: {}", topicIdPartition);
calculateSizeInPercent(log.size(), log.config().retentionSize, log.size(), log.config().localRetentionBytes());
return;
}
updateMetadataCountAndLogSizeWith(stats.metadataCount, stats.sizeInBytes);
Expand All @@ -1347,7 +1385,8 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE
long logStartOffset = log.logStartOffset();
long logEndOffset = log.logEndOffset();
Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(log.config().retentionSize,
log.onlyLocalLogSegmentsSize(), logEndOffset, epochWithOffsets, stats.copyFinishedSegmentsSizeInBytes);
log.onlyLocalLogSegmentsSize(), log.size(), logEndOffset, epochWithOffsets, log.config().localRetentionBytes(),
stats.copyFinishedSegmentsSizeInBytes);
Optional<RetentionTimeData> retentionTimeData = buildRetentionTimeData(log.config().retentionMs);

RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
Expand Down Expand Up @@ -1482,12 +1521,31 @@ private Optional<RetentionTimeData> buildRetentionTimeData(long retentionMs) {
: Optional.empty();
}

private void calculateSizeInPercent(long totalSize,
long retentionSize,
long localLogSegmentsSize,
long localRetentionBytes) {
int sizePercentage = retentionSize > 0 ? (int) ((totalSize * 100) / retentionSize) : 0;
retentionSizeInPercentValue.set(sizePercentage);

// Calculate local size percentage only if local retention is configured
int localSizePercentage = localRetentionBytes > 0 ? (int) ((localLogSegmentsSize * 100) / localRetentionBytes) : 0;
localRetentionSizeInPercentValue.set(localSizePercentage);
}

Optional<RetentionSizeData> buildRetentionSizeData(long retentionSize,
long onlyLocalLogSegmentsSize,
long localLogSegmentsSize,
long logEndOffset,
NavigableMap<Integer, Long> epochEntries,
long localRetentionBytes,
long fullCopyFinishedSegmentsSizeInBytes) throws RemoteStorageException {
if (retentionSize < 0 || (onlyLocalLogSegmentsSize + fullCopyFinishedSegmentsSizeInBytes) <= retentionSize) {
if (retentionSize < 0) {
return Optional.empty();
}
long totalEstimateSize = onlyLocalLogSegmentsSize + fullCopyFinishedSegmentsSizeInBytes;
if (totalEstimateSize <= retentionSize) {
calculateSizeInPercent(totalEstimateSize, retentionSize, localLogSegmentsSize, localRetentionBytes);
return Optional.empty();
}
// compute valid remote-log size in bytes for the current partition if the size of the partition exceeds
Expand Down Expand Up @@ -1533,6 +1591,7 @@ Optional<RetentionSizeData> buildRetentionSizeData(long retentionSize,
// This is the total size of segments in local log that have their base-offset > local-log-start-offset
// and size of the segments in remote storage which have their end-offset < local-log-start-offset.
long totalSize = onlyLocalLogSegmentsSize + remoteLogSizeBytes;
calculateSizeInPercent(totalSize, retentionSize, localLogSegmentsSize, localRetentionBytes);
if (totalSize > retentionSize) {
long remainingBreachedSize = totalSize - retentionSize;
RetentionSizeData retentionSizeData = new RetentionSizeData(retentionSize, remainingBreachedSize);
Expand Down
Loading