diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
index 18071a2cdd..9f389138e4 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
@@ -18,6 +18,8 @@
package org.apache.amoro.server.scheduler.inline;
+import static org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS;
+
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.OptimizingConfig;
@@ -25,12 +27,15 @@
import org.apache.amoro.optimizing.evaluation.MetadataBasedEvaluationEvent;
import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
import org.apache.amoro.process.ProcessStatus;
+import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.utils.IcebergTableUtil;
+import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.table.MixedTable;
@@ -56,15 +61,23 @@ protected boolean enabled(TableRuntime tableRuntime) {
@Override
protected long getNextExecutingTime(TableRuntime tableRuntime) {
DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime;
+
+ if (defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled()) {
+ long newInterval = defaultTableRuntime.getLatestRefreshInterval();
+ if (newInterval > 0) {
+ return newInterval;
+ }
+ }
+
return Math.min(
defaultTableRuntime.getOptimizingConfig().getMinorLeastInterval() * 4L / 5, interval);
}
- private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTable table) {
+ private boolean tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTable table) {
// only evaluate pending input when optimizing is enabled and in idle state
OptimizingConfig optimizingConfig = tableRuntime.getOptimizingConfig();
- if (optimizingConfig.isEnabled()
- && tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) {
+ boolean optimizingEnabled = optimizingConfig.isEnabled();
+ if (optimizingEnabled && tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) {
if (optimizingConfig.isMetadataBasedTriggerEnabled()
&& !MetadataBasedEvaluationEvent.isEvaluatingNecessary(
@@ -72,12 +85,14 @@ private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTa
logger.debug(
"{} optimizing is not necessary due to metadata based trigger",
tableRuntime.getTableIdentifier());
- return;
+ // indicates no optimization demand now
+ return false;
}
AbstractOptimizingEvaluator evaluator =
IcebergTableUtil.createOptimizingEvaluator(tableRuntime, table, maxPendingPartitions);
- if (evaluator.isNecessary()) {
+ boolean evaluatorIsNecessary = evaluator.isNecessary();
+ if (evaluatorIsNecessary) {
AbstractOptimizingEvaluator.PendingInput pendingInput =
evaluator.getOptimizingPendingInput();
logger.debug(
@@ -88,7 +103,21 @@ private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTa
} else {
tableRuntime.optimizingNotNecessary();
}
+
tableRuntime.setTableSummary(evaluator.getPendingInput());
+ return evaluatorIsNecessary;
+ } else if (!optimizingEnabled) {
+ logger.debug(
+ "{} optimizing is not enabled, skip evaluating pending input",
+ tableRuntime.getTableIdentifier());
+ // indicates no optimization demand now
+ return false;
+ } else {
+ logger.debug(
+ "{} optimizing is processing or is in preparation", tableRuntime.getTableIdentifier());
+ // indicates optimization demand exists (preparation or processing),
+ // even though we don't trigger a new evaluation in this loop.
+ return true;
}
}
@@ -122,16 +151,148 @@ public void execute(TableRuntime tableRuntime) {
AmoroTable> table = loadTable(tableRuntime);
defaultTableRuntime.refresh(table);
MixedTable mixedTable = (MixedTable) table.originalTable();
+ // Check if there is any optimizing demand now.
+ boolean hasOptimizingDemand = false;
if ((mixedTable.isKeyedTable()
&& (lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId()
|| lastOptimizedChangeSnapshotId
!= defaultTableRuntime.getCurrentChangeSnapshotId()))
|| (mixedTable.isUnkeyedTable()
&& lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId())) {
- tryEvaluatingPendingInput(defaultTableRuntime, mixedTable);
+ hasOptimizingDemand = tryEvaluatingPendingInput(defaultTableRuntime, mixedTable);
+ } else {
+ logger.debug("{} optimizing is not necessary", defaultTableRuntime.getTableIdentifier());
+ }
+
+ // Update adaptive interval according to evaluated result.
+ if (defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled()) {
+ defaultTableRuntime.setLatestEvaluatedNeedOptimizing(hasOptimizingDemand);
+ long newInterval = getAdaptiveExecutingInterval(defaultTableRuntime);
+ defaultTableRuntime.setLatestRefreshInterval(newInterval);
}
} catch (Throwable throwable) {
logger.error("Refreshing table {} failed.", tableRuntime.getTableIdentifier(), throwable);
}
}
+
+ /**
+ * Calculate adaptive execution interval based on table optimization status.
+ *
+ *
Uses AIMD (Additive Increase Multiplicative Decrease) algorithm inspired by TCP congestion
+ * control:
+ *
+ *
+ * - If table does not need to be optimized: additive increase - gradually extend interval to
+ * reduce resource consumption
+ *
- If table needs optimization: multiplicative decrease - rapidly reduce interval for quick
+ * response
+ *
+ *
+ * Interval is bounded by [interval_min, interval_max] and kept in memory only (resets to
+ * interval_min on restart).
+ *
+ * @param tableRuntime The table runtime information containing current status and configuration
+ * @return The next execution interval in milliseconds
+ */
+ @VisibleForTesting
+ public long getAdaptiveExecutingInterval(DefaultTableRuntime tableRuntime) {
+ final long minInterval = interval;
+ final long maxInterval =
+ tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveMaxIntervalMs();
+
+ if (maxInterval <= minInterval) {
+ tableRuntime
+ .getOptimizingConfig()
+ .setRefreshTableAdaptiveMaxIntervalMs(AmoroServiceConstants.INVALID_TIME);
+ logger.warn(
+ "Invalid adaptive refresh configuration for table {}: {} = {}ms is not greater than {} = {}ms. Setting {} to default value 0 to disable dynamic refresh logic.",
+ tableRuntime.getTableIdentifier(),
+ SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS,
+ maxInterval,
+ AmoroManagementConf.REFRESH_TABLES_INTERVAL.key(),
+ minInterval,
+ SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS);
+
+ return AmoroServiceConstants.INVALID_TIME;
+ }
+
+ long currentInterval = tableRuntime.getLatestRefreshInterval();
+
+ // Initialize interval on first run or after restart
+ if (currentInterval == 0) {
+ currentInterval = minInterval;
+ }
+
+ // Determine whether table needs optimization
+ boolean needOptimizing = tableRuntime.getLatestEvaluatedNeedOptimizing();
+
+ long nextInterval;
+ if (needOptimizing) {
+ nextInterval = decreaseInterval(currentInterval, minInterval);
+ logger.debug(
+ "Table {} needs optimization, decreasing interval from {}ms to {}ms",
+ tableRuntime.getTableIdentifier(),
+ currentInterval,
+ nextInterval);
+ } else {
+ nextInterval = increaseInterval(tableRuntime, currentInterval, maxInterval);
+ logger.debug(
+ "Table {} does not need optimization, increasing interval from {}ms to {}ms",
+ tableRuntime.getTableIdentifier(),
+ currentInterval,
+ nextInterval);
+ }
+
+ return nextInterval;
+ }
+
+ /**
+ * Decrease interval when table needs optimization.
+ *
+ *
Uses multiplicative decrease (halving) inspired by TCP Fast Recovery algorithm for rapid
+ * response to table health issues.
+ *
+ * @param currentInterval Current refresh interval in milliseconds
+ * @param minInterval Minimum allowed interval in milliseconds
+ * @return New interval after decrease.
+ */
+ private long decreaseInterval(long currentInterval, long minInterval) {
+ long newInterval = currentInterval / 2;
+ long boundedInterval = Math.max(newInterval, minInterval);
+ if (newInterval < minInterval) {
+ logger.debug(
+ "Interval reached minimum boundary: attempted {}ms, capped at {}ms",
+ newInterval,
+ minInterval);
+ }
+
+ return boundedInterval;
+ }
+
+ /**
+ * Increase interval when table does not need optimization.
+ *
+ *
Uses additive increase inspired by TCP Congestion Avoidance algorithm for gradual and stable
+ * growth.
+ *
+ * @param tableRuntime The table runtime information containing configuration
+ * @param currentInterval Current refresh interval in milliseconds
+ * @param maxInterval Maximum allowed interval in milliseconds
+ * @return New interval after increase.
+ */
+ private long increaseInterval(
+ DefaultTableRuntime tableRuntime, long currentInterval, long maxInterval) {
+ long step = tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveIncreaseStepMs();
+ long newInterval = currentInterval + step;
+ long boundedInterval = Math.min(newInterval, maxInterval);
+ if (newInterval > maxInterval) {
+ logger.debug(
+ "Interval reached maximum boundary: currentInterval is {}ms, attempted {}ms, capped at {}ms",
+ currentInterval,
+ newInterval,
+ maxInterval);
+ }
+
+ return boundedInterval;
+ }
}
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index c1506218d2..6b294d3bae 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -100,6 +100,8 @@ public class DefaultTableRuntime extends AbstractTableRuntime
private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics;
private final TableSummaryMetrics tableSummaryMetrics;
private volatile long lastPlanTime;
+ private volatile long latestRefreshInterval = AmoroServiceConstants.INVALID_TIME;
+ private volatile boolean latestEvaluatedNeedOptimizing = true;
private volatile OptimizingProcess optimizingProcess;
private final List taskQuotas = new CopyOnWriteArrayList<>();
@@ -181,6 +183,22 @@ public void setLastPlanTime(long lastPlanTime) {
this.lastPlanTime = lastPlanTime;
}
+ public long getLatestRefreshInterval() {
+ return latestRefreshInterval;
+ }
+
+ public void setLatestRefreshInterval(long latestRefreshInterval) {
+ this.latestRefreshInterval = latestRefreshInterval;
+ }
+
+ public boolean getLatestEvaluatedNeedOptimizing() {
+ return this.latestEvaluatedNeedOptimizing;
+ }
+
+ public void setLatestEvaluatedNeedOptimizing(boolean latestEvaluatedNeedOptimizing) {
+ this.latestEvaluatedNeedOptimizing = latestEvaluatedNeedOptimizing;
+ }
+
public OptimizingStatus getOptimizingStatus() {
return OptimizingStatus.ofCode(getStatusCode());
}
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
index c891a24f4d..1601166db4 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
@@ -341,7 +341,17 @@ public static OptimizingConfig parseOptimizingConfig(Map propert
PropertyUtil.propertyAsLong(
properties,
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE,
- TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT));
+ TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT))
+ .setRefreshTableAdaptiveMaxIntervalMs(
+ PropertyUtil.propertyAsLong(
+ properties,
+ TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS,
+ TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS_DEFAULT))
+ .setRefreshTableAdaptiveIncreaseStepMs(
+ PropertyUtil.propertyAsLong(
+ properties,
+ TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS,
+ TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS_DEFAULT));
}
/**
diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java
new file mode 100644
index 0000000000..5d5a0fb6d3
--- /dev/null
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.scheduler.inline;
+
+import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.TestedCatalogs;
+import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.config.OptimizingConfig;
+import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
+import org.apache.amoro.hive.catalog.HiveTableTestHelper;
+import org.apache.amoro.server.AmoroServiceConstants;
+import org.apache.amoro.server.table.AMSTableTestBase;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.table.TableRuntimeStore;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestTableRuntimeRefreshExecutor extends AMSTableTestBase {
+ @Parameterized.Parameters(name = "{0}, {1}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(true, true)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(false, true)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(false, false)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(true, false)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(true, true)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(false, true)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(false, false)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(true, false)
+ },
+ };
+ }
+
+ public TestTableRuntimeRefreshExecutor(
+ CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
+ super(catalogTestHelper, tableTestHelper);
+ }
+
+ private static final long INTERVAL = 60000L; // 1 minute
+ private static final long MAX_INTERVAL = 300000L; // 5 minutes
+ private static final long STEP = 30000L; // 30s
+ private static final int MAX_PENDING_PARTITIONS = 1;
+ private static final int MINOR_LEAST_INTERVAL = 3600000; // 1h
+
+ /**
+ * A test helper class that allows configuration updates. Reuses the same TableRuntime instance
+ * across different test scenarios.
+ */
+ private static class TestTableRuntime extends DefaultTableRuntime {
+ private OptimizingConfig testOptimizingConfig;
+
+ TestTableRuntime(TableRuntimeStore store, OptimizingConfig optimizingConfig) {
+ super(store);
+ this.testOptimizingConfig = optimizingConfig;
+ }
+
+ /** Update the optimizing config without creating a new instance */
+ void updateOptimizingConfig(OptimizingConfig newConfig) {
+ this.testOptimizingConfig = newConfig;
+ }
+
+ @Override
+ public OptimizingConfig getOptimizingConfig() {
+ return this.testOptimizingConfig;
+ }
+ }
+
+ /** Create OptimizingConfig with specified parameters */
+ private OptimizingConfig createOptimizingConfig(long maxInterval, long step) {
+ OptimizingConfig config = new OptimizingConfig();
+ config.setRefreshTableAdaptiveMaxIntervalMs(maxInterval);
+ config.setMinorLeastInterval(MINOR_LEAST_INTERVAL);
+ config.setRefreshTableAdaptiveIncreaseStepMs(step);
+ return config;
+ }
+
+ @Test
+ public void testAdaptiveRefreshIntervalScenarios() {
+ createDatabase();
+ createTable();
+
+ // Get the original table runtime
+ DefaultTableRuntime baseRuntime =
+ (DefaultTableRuntime) tableService().getRuntime(serverTableIdentifier().getId());
+ TableRuntimeRefreshExecutor executor =
+ new TableRuntimeRefreshExecutor(tableService(), 1, INTERVAL, MAX_PENDING_PARTITIONS);
+
+ // Create a tableRuntime instance with initial config
+ OptimizingConfig initialConfig = createOptimizingConfig(MAX_INTERVAL, STEP);
+ TestTableRuntime tableRuntime = new TestTableRuntime(baseRuntime.store(), initialConfig);
+
+ // Test 1: Healthy table (not need optimizing) - interval should increase by STEP
+ // Initial state: needOptimizing=false, latestRefreshInterval=0 (will use default INTERVAL)
+ tableRuntime.setLatestEvaluatedNeedOptimizing(false);
+ tableRuntime.setLatestRefreshInterval(0);
+ long adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(tableRuntime);
+ long expectedInterval = INTERVAL + STEP;
+ Assert.assertEquals(expectedInterval, adaptiveExecutingInterval);
+
+ // Test 2: Test minimum boundary - interval should not below INTERVAL
+ // Unhealthy table (need optimizing) - interval should decrease to half
+ // current interval is INTERVAL + STEP, the latest interval is INTERVAL
+ // not (INTERVAL + STEP) / 2
+ tableRuntime.setLatestEvaluatedNeedOptimizing(true);
+ tableRuntime.setLatestRefreshInterval(adaptiveExecutingInterval);
+ adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(tableRuntime);
+ expectedInterval = INTERVAL;
+ Assert.assertEquals(expectedInterval, adaptiveExecutingInterval);
+
+ // Test 3: Healthy table with larger step value - interval should increase by 8 * STEP
+ tableRuntime.setLatestEvaluatedNeedOptimizing(false);
+ tableRuntime.setLatestRefreshInterval(adaptiveExecutingInterval);
+ tableRuntime.updateOptimizingConfig(createOptimizingConfig(MAX_INTERVAL, 8 * STEP));
+ adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(tableRuntime);
+ expectedInterval = expectedInterval + 8 * STEP;
+ Assert.assertEquals(expectedInterval, adaptiveExecutingInterval);
+
+ // Test 4: Maximum boundary - interval should not exceed MAX_INTERVAL
+ tableRuntime.setLatestRefreshInterval(adaptiveExecutingInterval);
+ tableRuntime.updateOptimizingConfig(createOptimizingConfig(MAX_INTERVAL, STEP));
+ // current interval is INTERVAL + 8 * STEP, the latest interval is MAX_INTERVAL
+ // rather than INTERVAL + 9 * STEP
+ adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(tableRuntime);
+ Assert.assertEquals(MAX_INTERVAL, adaptiveExecutingInterval);
+
+ // Test5: MaxInterval should be greater than minInterval
+ // If maxInterval <= minInterval, the latest interval and the maxInterval will be reset to
+ // default value 0
+ long maxInterval = INTERVAL - 1000;
+ tableRuntime.updateOptimizingConfig(createOptimizingConfig(maxInterval, STEP));
+ tableRuntime.setLatestRefreshInterval(adaptiveExecutingInterval);
+ adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(tableRuntime);
+ Assert.assertEquals(AmoroServiceConstants.INVALID_TIME, adaptiveExecutingInterval);
+ Assert.assertEquals(
+ AmoroServiceConstants.INVALID_TIME,
+ tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveMaxIntervalMs());
+
+ dropTable();
+ dropDatabase();
+ }
+
+ @Test
+ public void testGetNextExecutingTime() {
+ createDatabase();
+ createTable();
+
+ // Get the original table runtime
+ DefaultTableRuntime baseRuntime =
+ (DefaultTableRuntime) tableService().getRuntime(serverTableIdentifier().getId());
+ TableRuntimeRefreshExecutor executor =
+ new TableRuntimeRefreshExecutor(tableService(), 1, INTERVAL, MAX_PENDING_PARTITIONS);
+
+ // Create a tableRuntime instance with adaptive interval enabled
+ OptimizingConfig configWithAdaptiveEnabled = createOptimizingConfig(MAX_INTERVAL, STEP);
+ TestTableRuntime tableRuntime =
+ new TestTableRuntime(baseRuntime.store(), configWithAdaptiveEnabled);
+
+ // Test 1: getNextExecutingTime with adaptive interval enabled and positive
+ // latestRefreshInterval
+ // Set a positive latestRefreshInterval to enable adaptive behavior
+ tableRuntime.setLatestRefreshInterval(MAX_INTERVAL);
+ long nextExecutingTime = executor.getNextExecutingTime(tableRuntime);
+ Assert.assertEquals(MAX_INTERVAL, nextExecutingTime);
+
+ // Test 2: getNextExecutingTime with adaptive interval enabled but latestRefreshInterval is 0
+ // Should fall back to min(minorLeastInterval * 4/5, INTERVAL)
+ tableRuntime.setLatestRefreshInterval(0);
+ nextExecutingTime = executor.getNextExecutingTime(tableRuntime);
+ long expectedFallbackInterval = Math.min(MINOR_LEAST_INTERVAL * 4L / 5, INTERVAL);
+ Assert.assertEquals(expectedFallbackInterval, nextExecutingTime);
+
+ dropTable();
+ dropDatabase();
+ }
+}
diff --git a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java
index 0c743ac6bd..8fd2ab6a56 100644
--- a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java
+++ b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java
@@ -97,6 +97,12 @@ public class OptimizingConfig {
// self-optimizing.evaluation.fallback-interval
private long evaluationFallbackInterval;
+ // self-optimizing.refresh-table.adaptive.max-interval-ms
+ private long refreshTableAdaptiveMaxIntervalMs;
+
+ // self-optimizing.refresh-table.adaptive.increase-step-ms
+ private long refreshTableAdaptiveIncreaseStepMs;
+
public OptimizingConfig() {}
public boolean isEnabled() {
@@ -318,6 +324,30 @@ public OptimizingConfig setEvaluationMseTolerance(long evaluationMseTolerance) {
return this;
}
+ public boolean isRefreshTableAdaptiveEnabled() {
+ return refreshTableAdaptiveMaxIntervalMs > 0;
+ }
+
+ public long getRefreshTableAdaptiveMaxIntervalMs() {
+ return refreshTableAdaptiveMaxIntervalMs;
+ }
+
+ public OptimizingConfig setRefreshTableAdaptiveMaxIntervalMs(
+ long refreshTableAdaptiveMaxIntervalMs) {
+ this.refreshTableAdaptiveMaxIntervalMs = refreshTableAdaptiveMaxIntervalMs;
+ return this;
+ }
+
+ public long getRefreshTableAdaptiveIncreaseStepMs() {
+ return refreshTableAdaptiveIncreaseStepMs;
+ }
+
+ public OptimizingConfig setRefreshTableAdaptiveIncreaseStepMs(
+ long refreshTableAdaptiveIncreaseStepMs) {
+ this.refreshTableAdaptiveIncreaseStepMs = refreshTableAdaptiveIncreaseStepMs;
+ return this;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -350,7 +380,10 @@ public boolean equals(Object o) {
&& Objects.equal(optimizerGroup, that.optimizerGroup)
&& Objects.equal(minPlanInterval, that.minPlanInterval)
&& Objects.equal(evaluationMseTolerance, that.evaluationMseTolerance)
- && Objects.equal(evaluationFallbackInterval, that.evaluationFallbackInterval);
+ && Objects.equal(evaluationFallbackInterval, that.evaluationFallbackInterval)
+ && Objects.equal(refreshTableAdaptiveMaxIntervalMs, that.refreshTableAdaptiveMaxIntervalMs)
+ && Objects.equal(
+ refreshTableAdaptiveIncreaseStepMs, that.refreshTableAdaptiveIncreaseStepMs);
}
@Override
@@ -379,7 +412,9 @@ public int hashCode() {
hiveRefreshInterval,
minPlanInterval,
evaluationMseTolerance,
- evaluationFallbackInterval);
+ evaluationFallbackInterval,
+ refreshTableAdaptiveMaxIntervalMs,
+ refreshTableAdaptiveIncreaseStepMs);
}
@Override
@@ -407,6 +442,8 @@ public String toString() {
.add("hiveRefreshInterval", hiveRefreshInterval)
.add("evaluationMseTolerance", evaluationMseTolerance)
.add("evaluationFallbackInterval", evaluationFallbackInterval)
+ .add("refreshTableAdaptiveMaxIntervalMs", refreshTableAdaptiveMaxIntervalMs)
+ .add("refreshTableAdaptiveIncreaseStepMs", refreshTableAdaptiveIncreaseStepMs)
.toString();
}
}
diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
index 0d804cdec7..015c68d480 100644
--- a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
+++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
@@ -160,6 +160,15 @@ private TableProperties() {}
public static final String SNAPSHOT_MIN_COUNT = "snapshot.keep.min-count";
public static final int SNAPSHOT_MIN_COUNT_DEFAULT = 1;
+ public static final String SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS =
+ "self-optimizing.refresh-table.adaptive.max-interval-ms";
+ public static final long SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS_DEFAULT =
+ 0; // disabled
+ public static final String SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS =
+ "self-optimizing.refresh-table.adaptive.increase-step-ms";
+ public static final long SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS_DEFAULT =
+ 30000; // 30s
+
/**
* The retention period for snapshots created by Flink checkpoints. Snapshots older than this
* duration may be cleaned up. Avoid keeping the last flink checkpoint snapshot for too long, as
diff --git a/docs/user-guides/configurations.md b/docs/user-guides/configurations.md
index 1895812cea..253c7a6e0b 100644
--- a/docs/user-guides/configurations.md
+++ b/docs/user-guides/configurations.md
@@ -43,25 +43,27 @@ modified through [Alter Table](../using-tables/#modify-table) operations.
Self-optimizing configurations are applicable to both Iceberg Format and Mixed streaming Format.
-| Key | Default | Description |
-|-----------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| self-optimizing.enabled | true | Enables Self-optimizing |
-| self-optimizing.allow-partial-commit | false | Whether to allow partial commit when self-optimizing fails or process is cancelled |
-| self-optimizing.group | default | Optimizer group for Self-optimizing |
-| self-optimizing.quota | 0.5 | Quota for Self-optimizing, indicating the optimizer resources the table can take up |
-| self-optimizing.execute.num-retries | 5 | Number of retries after failure of Self-optimizing |
-| self-optimizing.target-size | 134217728(128MB) | Target size for Self-optimizing |
-| self-optimizing.max-file-count | 10000 | Maximum number of files processed by a Self-optimizing process |
-| self-optimizing.max-task-size-bytes | 134217728(128MB) | Maximum file size bytes in a single task for splitting tasks |
-| self-optimizing.fragment-ratio | 8 | The fragment file size threshold. We could divide self-optimizing.target-size by this ratio to get the actual fragment file size |
-| self-optimizing.min-target-size-ratio | 0.75 | The undersized segment file size threshold. Segment files under this threshold will be considered for rewriting |
-| self-optimizing.minor.trigger.file-count | 12 | The minimum number of files to trigger minor optimizing is determined by the sum of fragment file count and equality delete file count |
-| self-optimizing.minor.trigger.interval | 3600000(1 hour) | The time interval in milliseconds to trigger minor optimizing |
-| self-optimizing.major.trigger.duplicate-ratio | 0.1 | The ratio of duplicate data of segment files to trigger major optimizing |
-| self-optimizing.full.trigger.interval | -1(closed) | The time interval in milliseconds to trigger full optimizing |
-| self-optimizing.full.rewrite-all-files | true | Whether full optimizing rewrites all files or skips files that do not need to be optimized |
-| self-optimizing.min-plan-interval | 60000 | The minimum time interval between two self-optimizing planning action |
-| self-optimizing.filter | NULL | Filter conditions for self-optimizing, using SQL conditional expressions, without supporting any functions. For the timestamp column condition, the ISO date-time formatter must be used. For example: op_time > '2007-12-03T10:15:30'. |
+| Key | Default | Description |
+|---------------------------------------------------------|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| self-optimizing.enabled | true | Enables Self-optimizing |
+| self-optimizing.allow-partial-commit | false | Whether to allow partial commit when self-optimizing fails or process is cancelled |
+| self-optimizing.group | default | Optimizer group for Self-optimizing |
+| self-optimizing.quota | 0.5 | Quota for Self-optimizing, indicating the optimizer resources the table can take up |
+| self-optimizing.execute.num-retries | 5 | Number of retries after failure of Self-optimizing |
+| self-optimizing.target-size | 134217728(128MB) | Target size for Self-optimizing |
+| self-optimizing.max-file-count | 10000 | Maximum number of files processed by a Self-optimizing process |
+| self-optimizing.max-task-size-bytes | 134217728(128MB) | Maximum file size bytes in a single task for splitting tasks |
+| self-optimizing.fragment-ratio | 8 | The fragment file size threshold. We could divide self-optimizing.target-size by this ratio to get the actual fragment file size |
+| self-optimizing.min-target-size-ratio | 0.75 | The undersized segment file size threshold. Segment files under this threshold will be considered for rewriting |
+| self-optimizing.minor.trigger.file-count | 12 | The minimum number of files to trigger minor optimizing is determined by the sum of fragment file count and equality delete file count |
+| self-optimizing.minor.trigger.interval | 3600000(1 hour) | The time interval in milliseconds to trigger minor optimizing |
+| self-optimizing.major.trigger.duplicate-ratio | 0.1 | The ratio of duplicate data of segment files to trigger major optimizing |
+| self-optimizing.full.trigger.interval | -1(closed) | The time interval in milliseconds to trigger full optimizing |
+| self-optimizing.full.rewrite-all-files | true | Whether full optimizing rewrites all files or skips files that do not need to be optimized |
+| self-optimizing.min-plan-interval | 60000 | The minimum time interval between two self-optimizing planning action |
+| self-optimizing.filter | NULL | Filter conditions for self-optimizing, using SQL conditional expressions, without supporting any functions. For the timestamp column condition, the ISO date-time formatter must be used. For example: op_time > '2007-12-03T10:15:30'. |
+| self-optimizing.refresh-table.adaptive.max-interval-ms | 0 | The maximum time interval in milliseconds to refresh table metadata. 0 means disable adaptive refresh. When enabled, the value must be greater than 'refresh-tables.interval' and may exceed 'self-optimizing.minor.trigger.interval' * 4/5. Invalid value will be reset to 0. |
+| self-optimizing.refresh-table.adaptive.increase-step-ms | 30000(30s) | The time interval increase step in milliseconds to refresh table metadata |
## Data-cleaning configurations