Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@

package org.apache.amoro.server.scheduler.inline;

import static org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS;

import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.optimizing.evaluation.MetadataBasedEvaluationEvent;
import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.table.MixedTable;

Expand All @@ -56,28 +61,38 @@ protected boolean enabled(TableRuntime tableRuntime) {
@Override
protected long getNextExecutingTime(TableRuntime tableRuntime) {
DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime;

if (defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled()) {
long newInterval = defaultTableRuntime.getLatestRefreshInterval();
if (newInterval > 0) {
return newInterval;
}
}

return Math.min(
defaultTableRuntime.getOptimizingConfig().getMinorLeastInterval() * 4L / 5, interval);
}

private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTable table) {
private boolean tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTable table) {
// only evaluate pending input when optimizing is enabled and in idle state
OptimizingConfig optimizingConfig = tableRuntime.getOptimizingConfig();
if (optimizingConfig.isEnabled()
&& tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) {
boolean optimizingEnabled = optimizingConfig.isEnabled();
if (optimizingEnabled && tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) {

if (optimizingConfig.isMetadataBasedTriggerEnabled()
&& !MetadataBasedEvaluationEvent.isEvaluatingNecessary(
optimizingConfig, table, tableRuntime.getLastPlanTime())) {
logger.debug(
"{} optimizing is not necessary due to metadata based trigger",
tableRuntime.getTableIdentifier());
return;
// indicates no optimization demand now
return false;
}

AbstractOptimizingEvaluator evaluator =
IcebergTableUtil.createOptimizingEvaluator(tableRuntime, table, maxPendingPartitions);
if (evaluator.isNecessary()) {
boolean evaluatorIsNecessary = evaluator.isNecessary();
if (evaluatorIsNecessary) {
AbstractOptimizingEvaluator.PendingInput pendingInput =
evaluator.getOptimizingPendingInput();
logger.debug(
Expand All @@ -88,7 +103,21 @@ private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTa
} else {
tableRuntime.optimizingNotNecessary();
}

tableRuntime.setTableSummary(evaluator.getPendingInput());
return evaluatorIsNecessary;
} else if (!optimizingEnabled) {
logger.debug(
"{} optimizing is not enabled, skip evaluating pending input",
tableRuntime.getTableIdentifier());
// indicates no optimization demand now
return false;
} else {
logger.debug(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add some comments to describle the "semantics" of the return value, and why would this case would be true?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From below, the name is needOptimizing, looks like whethere we need to do an opitmizing, but in this branch, we don't need to optimizing in current loop? as there is an ongoing optimizing process

"{} optimizing is processing or is in preparation", tableRuntime.getTableIdentifier());
// indicates optimization demand exists (preparation or processing),
// even though we don't trigger a new evaluation in this loop.
return true;
}
}

Expand Down Expand Up @@ -122,16 +151,148 @@ public void execute(TableRuntime tableRuntime) {
AmoroTable<?> table = loadTable(tableRuntime);
defaultTableRuntime.refresh(table);
MixedTable mixedTable = (MixedTable) table.originalTable();
// Check if there is any optimizing demand now.
boolean hasOptimizingDemand = false;
if ((mixedTable.isKeyedTable()
&& (lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId()
|| lastOptimizedChangeSnapshotId
!= defaultTableRuntime.getCurrentChangeSnapshotId()))
|| (mixedTable.isUnkeyedTable()
&& lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId())) {
tryEvaluatingPendingInput(defaultTableRuntime, mixedTable);
hasOptimizingDemand = tryEvaluatingPendingInput(defaultTableRuntime, mixedTable);
} else {
logger.debug("{} optimizing is not necessary", defaultTableRuntime.getTableIdentifier());
}

// Update adaptive interval according to evaluated result.
if (defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled()) {
defaultTableRuntime.setLatestEvaluatedNeedOptimizing(hasOptimizingDemand);
long newInterval = getAdaptiveExecutingInterval(defaultTableRuntime);
defaultTableRuntime.setLatestRefreshInterval(newInterval);
}
} catch (Throwable throwable) {
logger.error("Refreshing table {} failed.", tableRuntime.getTableIdentifier(), throwable);
}
}

/**
* Calculate adaptive execution interval based on table optimization status.
*
* <p>Uses AIMD (Additive Increase Multiplicative Decrease) algorithm inspired by TCP congestion
* control:
*
* <ul>
* <li>If table does not need to be optimized: additive increase - gradually extend interval to
* reduce resource consumption
* <li>If table needs optimization: multiplicative decrease - rapidly reduce interval for quick
* response
* </ul>
*
* <p>Interval is bounded by [interval_min, interval_max] and kept in memory only (resets to
* interval_min on restart).
*
* @param tableRuntime The table runtime information containing current status and configuration
* @return The next execution interval in milliseconds
*/
@VisibleForTesting
public long getAdaptiveExecutingInterval(DefaultTableRuntime tableRuntime) {
final long minInterval = interval;
final long maxInterval =
tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveMaxIntervalMs();

if (maxInterval <= minInterval) {
tableRuntime
.getOptimizingConfig()
.setRefreshTableAdaptiveMaxIntervalMs(AmoroServiceConstants.INVALID_TIME);
logger.warn(
"Invalid adaptive refresh configuration for table {}: {} = {}ms is not greater than {} = {}ms. Setting {} to default value 0 to disable dynamic refresh logic.",
tableRuntime.getTableIdentifier(),
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS,
maxInterval,
AmoroManagementConf.REFRESH_TABLES_INTERVAL.key(),
minInterval,
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS);

return AmoroServiceConstants.INVALID_TIME;
}

long currentInterval = tableRuntime.getLatestRefreshInterval();

// Initialize interval on first run or after restart
if (currentInterval == 0) {
currentInterval = minInterval;
}

// Determine whether table needs optimization
boolean needOptimizing = tableRuntime.getLatestEvaluatedNeedOptimizing();

long nextInterval;
if (needOptimizing) {
nextInterval = decreaseInterval(currentInterval, minInterval);
logger.debug(
"Table {} needs optimization, decreasing interval from {}ms to {}ms",
tableRuntime.getTableIdentifier(),
currentInterval,
nextInterval);
} else {
nextInterval = increaseInterval(tableRuntime, currentInterval, maxInterval);
logger.debug(
"Table {} does not need optimization, increasing interval from {}ms to {}ms",
tableRuntime.getTableIdentifier(),
currentInterval,
nextInterval);
}

return nextInterval;
}

/**
* Decrease interval when table needs optimization.
*
* <p>Uses multiplicative decrease (halving) inspired by TCP Fast Recovery algorithm for rapid
* response to table health issues.
*
* @param currentInterval Current refresh interval in milliseconds
* @param minInterval Minimum allowed interval in milliseconds
* @return New interval after decrease.
*/
private long decreaseInterval(long currentInterval, long minInterval) {
long newInterval = currentInterval / 2;
long boundedInterval = Math.max(newInterval, minInterval);
if (newInterval < minInterval) {
logger.debug(
"Interval reached minimum boundary: attempted {}ms, capped at {}ms",
newInterval,
minInterval);
}

return boundedInterval;
}

/**
* Increase interval when table does not need optimization.
*
* <p>Uses additive increase inspired by TCP Congestion Avoidance algorithm for gradual and stable
* growth.
*
* @param tableRuntime The table runtime information containing configuration
* @param currentInterval Current refresh interval in milliseconds
* @param maxInterval Maximum allowed interval in milliseconds
* @return New interval after increase.
*/
private long increaseInterval(
DefaultTableRuntime tableRuntime, long currentInterval, long maxInterval) {
long step = tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveIncreaseStepMs();
long newInterval = currentInterval + step;
long boundedInterval = Math.min(newInterval, maxInterval);
if (newInterval > maxInterval) {
logger.debug(
"Interval reached maximum boundary: currentInterval is {}ms, attempted {}ms, capped at {}ms",
currentInterval,
newInterval,
maxInterval);
}

return boundedInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public class DefaultTableRuntime extends AbstractTableRuntime
private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics;
private final TableSummaryMetrics tableSummaryMetrics;
private volatile long lastPlanTime;
private volatile long latestRefreshInterval = AmoroServiceConstants.INVALID_TIME;
private volatile boolean latestEvaluatedNeedOptimizing = true;
private volatile OptimizingProcess optimizingProcess;
private final List<TaskRuntime.TaskQuota> taskQuotas = new CopyOnWriteArrayList<>();

Expand Down Expand Up @@ -181,6 +183,22 @@ public void setLastPlanTime(long lastPlanTime) {
this.lastPlanTime = lastPlanTime;
}

public long getLatestRefreshInterval() {
return latestRefreshInterval;
}

public void setLatestRefreshInterval(long latestRefreshInterval) {
this.latestRefreshInterval = latestRefreshInterval;
}

public boolean getLatestEvaluatedNeedOptimizing() {
return this.latestEvaluatedNeedOptimizing;
}

public void setLatestEvaluatedNeedOptimizing(boolean latestEvaluatedNeedOptimizing) {
this.latestEvaluatedNeedOptimizing = latestEvaluatedNeedOptimizing;
}

public OptimizingStatus getOptimizingStatus() {
return OptimizingStatus.ofCode(getStatusCode());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,17 @@ public static OptimizingConfig parseOptimizingConfig(Map<String, String> propert
PropertyUtil.propertyAsLong(
properties,
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE,
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT));
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT))
.setRefreshTableAdaptiveMaxIntervalMs(
PropertyUtil.propertyAsLong(
properties,
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS,
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS_DEFAULT))
.setRefreshTableAdaptiveIncreaseStepMs(
PropertyUtil.propertyAsLong(
properties,
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS,
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS_DEFAULT));
}

/**
Expand Down
Loading