From 222e4bc4c975db2beb1c5b1a029f96e5e8db52a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=96=87=E9=A2=86?= Date: Wed, 14 Jan 2026 14:02:52 +0800 Subject: [PATCH 1/5] Add support for dynamic refresh interval for table metadata refreshing in the TableRuntimeRefreshExecutor. --- .../inline/TableRuntimeRefreshExecutor.java | 140 +++++++++- .../server/table/DefaultTableRuntime.java | 21 ++ .../server/table/TableConfigurations.java | 17 +- .../TestTableRuntimeRefreshExecutor.java | 254 ++++++++++++++++++ .../apache/amoro/config/OptimizingConfig.java | 50 +++- .../apache/amoro/table/TableProperties.java | 12 + docs/user-guides/configurations.md | 41 +-- 7 files changed, 510 insertions(+), 25 deletions(-) create mode 100644 amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java index 18071a2cdd..709713e5a3 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java @@ -56,6 +56,14 @@ protected boolean enabled(TableRuntime tableRuntime) { @Override protected long getNextExecutingTime(TableRuntime tableRuntime) { DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime; + + if (defaultTableRuntime.getOptimizingConfig().getRefreshTableAdaptiveEnabled()) { + long newInterval = defaultTableRuntime.getLatestRefreshInterval(); + if (newInterval > 0) { + return newInterval; + } + } + return Math.min( defaultTableRuntime.getOptimizingConfig().getMinorLeastInterval() * 4L / 5, interval); } @@ -63,12 +71,14 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) { private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTable table) { // only evaluate pending input when optimizing is enabled and in idle state OptimizingConfig optimizingConfig = tableRuntime.getOptimizingConfig(); - if (optimizingConfig.isEnabled() - && tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) { + boolean optimizingEnabled = optimizingConfig.isEnabled(); + if (optimizingEnabled && tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) { if (optimizingConfig.isMetadataBasedTriggerEnabled() && !MetadataBasedEvaluationEvent.isEvaluatingNecessary( optimizingConfig, table, tableRuntime.getLastPlanTime())) { + tableRuntime.setLatestEvaluatedNeedOptimizing(false); + logger.debug( "{} optimizing is not necessary due to metadata based trigger", tableRuntime.getTableIdentifier()); @@ -87,8 +97,17 @@ private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTa tableRuntime.setPendingInput(pendingInput); } else { tableRuntime.optimizingNotNecessary(); + tableRuntime.setTableSummary(evaluator.getPendingInput()); } - tableRuntime.setTableSummary(evaluator.getPendingInput()); + } else if (!optimizingEnabled) { + tableRuntime.setLatestEvaluatedNeedOptimizing(false); + logger.debug( + "{} optimizing is not enabled, skip evaluating pending input", + tableRuntime.getTableIdentifier()); + } else { + tableRuntime.setLatestEvaluatedNeedOptimizing(true); + logger.debug( + "{} optimizing is processing or is in preparation", tableRuntime.getTableIdentifier()); } } @@ -129,9 +148,124 @@ public void execute(TableRuntime tableRuntime) { || (mixedTable.isUnkeyedTable() && lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId())) { tryEvaluatingPendingInput(defaultTableRuntime, mixedTable); + } else { + logger.debug("{} optimizing is not necessary", defaultTableRuntime.getTableIdentifier()); + defaultTableRuntime.setLatestEvaluatedNeedOptimizing(false); + } + + // Update adaptive interval according to evaluating result. + if (defaultTableRuntime.getOptimizingConfig().getRefreshTableAdaptiveEnabled()) { + long newInterval = getAdaptiveExecutingInterval(defaultTableRuntime); + defaultTableRuntime.setLatestRefreshInterval(newInterval); } + } catch (Throwable throwable) { logger.error("Refreshing table {} failed.", tableRuntime.getTableIdentifier(), throwable); } } + + /** + * Calculate adaptive execution interval based on table optimization status. + * + *

Uses AIMD (Additive Increase Multiplicative Decrease) algorithm inspired by TCP congestion + * control: + * + *

+ * + *

Interval is bounded by [interval_min, interval_max] and kept in memory only (resets to + * interval_min on restart). + * + * @param tableRuntime The table runtime information containing current status and configuration + * @return The next execution interval in milliseconds + */ + private long getAdaptiveExecutingInterval(DefaultTableRuntime tableRuntime) { + final long minInterval = interval; + final long maxInterval = + tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveMaxInterval(); + long currentInterval = tableRuntime.getLatestRefreshInterval(); + + // Initialize interval on first run or after restart + if (currentInterval == 0) { + currentInterval = minInterval; + } + + // Determine whether table needs optimization + boolean needOptimizing = tableRuntime.getLatestEvaluatedNeedOptimizing(); + + long nextInterval; + if (needOptimizing) { + nextInterval = decreaseInterval(currentInterval, minInterval); + logger.debug( + "Table {} needs optimization, decreasing interval from {}ms to {}ms", + tableRuntime.getTableIdentifier(), + currentInterval, + nextInterval); + } else { + nextInterval = increaseInterval(tableRuntime, currentInterval, maxInterval); + logger.debug( + "Table {} does not need optimization, increasing interval from {}ms to {}ms", + tableRuntime.getTableIdentifier(), + currentInterval, + nextInterval); + } + + return nextInterval; + } + + /** + * Decrease interval when table needs optimization. + * + *

Uses multiplicative decrease (halving) inspired by TCP Fast Recovery algorithm for rapid + * response to table health issues. + * + * @param currentInterval Current refresh interval in milliseconds + * @param minInterval Minimum allowed interval in milliseconds + * @return New interval after decrease. + */ + private long decreaseInterval(long currentInterval, long minInterval) { + long newInterval = currentInterval / 2; + long boundedInterval = Math.max(newInterval, minInterval); + + if (newInterval < minInterval) { + logger.debug( + "Interval reached minimum boundary: attempted {}ms, capped at {}ms", + newInterval, + minInterval); + } + + return boundedInterval; + } + + /** + * Increase interval when table does not need optimization. + * + *

Uses additive increase inspired by TCP Congestion Avoidance algorithm for gradual and stable + * growth. + * + * @param tableRuntime The table runtime information containing configuration + * @param currentInterval Current refresh interval in milliseconds + * @param maxInterval Maximum allowed interval in milliseconds + * @return New interval after increase. + */ + private long increaseInterval( + DefaultTableRuntime tableRuntime, long currentInterval, long maxInterval) { + long step = tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveIncreaseStep(); + long newInterval = currentInterval + step; + long boundedInterval = Math.min(newInterval, maxInterval); + + if (newInterval > maxInterval) { + logger.debug( + "Interval reached maximum boundary: currentInterval is {}ms, attempted {}ms, capped at {}ms", + currentInterval, + newInterval, + maxInterval); + } + + return boundedInterval; + } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java index c1506218d2..60841207b1 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java @@ -100,6 +100,8 @@ public class DefaultTableRuntime extends AbstractTableRuntime private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics; private final TableSummaryMetrics tableSummaryMetrics; private volatile long lastPlanTime; + private volatile long latestRefreshInterval = AmoroServiceConstants.INVALID_TIME; + private volatile boolean latestEvaluatedNeedOptimizing = true; private volatile OptimizingProcess optimizingProcess; private final List taskQuotas = new CopyOnWriteArrayList<>(); @@ -181,6 +183,22 @@ public void setLastPlanTime(long lastPlanTime) { this.lastPlanTime = lastPlanTime; } + public long getLatestRefreshInterval() { + return latestRefreshInterval; + } + + public void setLatestRefreshInterval(long latestRefreshInterval) { + this.latestRefreshInterval = latestRefreshInterval; + } + + public boolean getLatestEvaluatedNeedOptimizing() { + return this.latestEvaluatedNeedOptimizing; + } + + public void setLatestEvaluatedNeedOptimizing(boolean latestEvaluatedNeedOptimizing) { + this.latestEvaluatedNeedOptimizing = latestEvaluatedNeedOptimizing; + } + public OptimizingStatus getOptimizingStatus() { return OptimizingStatus.ofCode(getStatusCode()); } @@ -288,6 +306,7 @@ public void setPendingInput(AbstractOptimizingEvaluator.PendingInput pendingInpu summary.setTotalFileCount(pendingFileCount); }) .commit(); + this.latestEvaluatedNeedOptimizing = true; } public void setTableSummary(AbstractOptimizingEvaluator.PendingInput tableSummary) { @@ -462,6 +481,8 @@ public void optimizingNotNecessary() { }) .commit(); } + + this.latestEvaluatedNeedOptimizing = false; } public void beginCommitting() { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java index c891a24f4d..44253368c8 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java @@ -341,7 +341,22 @@ public static OptimizingConfig parseOptimizingConfig(Map propert PropertyUtil.propertyAsLong( properties, TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE, - TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT)); + TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT)) + .setRefreshTableAdaptiveEnabled( + PropertyUtil.propertyAsBoolean( + properties, + TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_ENABLED, + TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_ENABLED_DEFAULT)) + .setRefreshTableAdaptiveMaxInterval( + PropertyUtil.propertyAsLong( + properties, + TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL, + TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_DEFAULT)) + .setRefreshTableAdaptiveIncreaseStep( + PropertyUtil.propertyAsLong( + properties, + TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP, + TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_DEFAULT)); } /** diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java new file mode 100644 index 0000000000..bbb64c3aa2 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.scheduler.inline; + +import static org.mockito.Mockito.verify; + +import org.apache.amoro.BasicTableTestHelper; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableTestHelper; +import org.apache.amoro.TestedCatalogs; +import org.apache.amoro.catalog.CatalogTestHelper; +import org.apache.amoro.config.OptimizingConfig; +import org.apache.amoro.hive.catalog.HiveCatalogTestHelper; +import org.apache.amoro.hive.catalog.HiveTableTestHelper; +import org.apache.amoro.server.table.AMSTableTestBase; +import org.apache.amoro.server.table.DefaultTableRuntime; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mockito; + +@RunWith(Parameterized.class) +public class TestTableRuntimeRefreshExecutor extends AMSTableTestBase { + @Parameterized.Parameters(name = "{0}, {1}") + public static Object[] parameters() { + return new Object[][] { + { + TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG), + new BasicTableTestHelper(true, true) + }, + { + TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG), + new BasicTableTestHelper(false, true) + }, + { + TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG), + new BasicTableTestHelper(false, false) + }, + { + TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG), + new BasicTableTestHelper(true, false) + }, + { + new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()), + new HiveTableTestHelper(true, true) + }, + { + new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()), + new HiveTableTestHelper(false, true) + }, + { + new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()), + new HiveTableTestHelper(false, false) + }, + { + new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()), + new HiveTableTestHelper(true, false) + }, + }; + } + + public TestTableRuntimeRefreshExecutor( + CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) { + super(catalogTestHelper, tableTestHelper); + } + + private static final long INTERVAL = 60000L; // 1 minute + private static final long MAX_INTERVAL = 300000L; // 5 minutes + private static final long STEP = 30000L; // 30s + private static final int MAX_PENDING_PARTITIONS = 1; + private static final int MINOR_LEAST_INTERVAL = 3600000; // 1h + private static final long INITIAL_SNAPSHOTID = 0L; + + // Create mock DefaultTableRuntime for adaptive interval tests + private DefaultTableRuntime getMockTableRuntimeWithAdaptiveInterval( + DefaultTableRuntime tableRuntime, + boolean needOptimizing, + boolean adaptiveEnabled, + long currentInterval, + long step) { + DefaultTableRuntime mockTableRuntime = Mockito.mock(DefaultTableRuntime.class); + Mockito.when(mockTableRuntime.getTableIdentifier()) + .thenReturn(tableRuntime.getTableIdentifier()); + Mockito.when(mockTableRuntime.getLastOptimizedSnapshotId()) + .thenReturn(tableRuntime.getLastOptimizedSnapshotId()); + Mockito.when(mockTableRuntime.getLastOptimizedChangeSnapshotId()) + .thenReturn(tableRuntime.getLastOptimizedChangeSnapshotId()); + Mockito.when(mockTableRuntime.getCurrentSnapshotId()).thenReturn(INITIAL_SNAPSHOTID); + Mockito.when(mockTableRuntime.getCurrentChangeSnapshotId()).thenReturn(INITIAL_SNAPSHOTID); + Mockito.when(mockTableRuntime.getLatestRefreshInterval()).thenReturn(currentInterval); + Mockito.when(mockTableRuntime.getLatestEvaluatedNeedOptimizing()).thenReturn(needOptimizing); + + OptimizingConfig mockOptimizingConfig = Mockito.mock(OptimizingConfig.class); + Mockito.when(mockOptimizingConfig.getRefreshTableAdaptiveEnabled()).thenReturn(adaptiveEnabled); + Mockito.when(mockOptimizingConfig.getRefreshTableAdaptiveMaxInterval()) + .thenReturn(MAX_INTERVAL); + Mockito.when(mockOptimizingConfig.getMinorLeastInterval()).thenReturn(MINOR_LEAST_INTERVAL); + Mockito.when(mockOptimizingConfig.getRefreshTableAdaptiveIncreaseStep()).thenReturn(step); + Mockito.when(mockTableRuntime.getOptimizingConfig()).thenReturn(mockOptimizingConfig); + + return mockTableRuntime; + } + + // Overloaded method with default step factor + private DefaultTableRuntime getMockTableRuntimeWithAdaptiveInterval( + DefaultTableRuntime tableRuntime, + boolean needOptimizing, + boolean adaptiveEnabled, + long currentInterval) { + return getMockTableRuntimeWithAdaptiveInterval( + tableRuntime, needOptimizing, adaptiveEnabled, currentInterval, STEP); + } + + @Test + public void testAdaptiveIntervalScenarios() { + createDatabase(); + createTable(); + + DefaultTableRuntime tableRuntime = + (DefaultTableRuntime) tableService().getRuntime(serverTableIdentifier().getId()); + TableRuntimeRefreshExecutor executor = + new TableRuntimeRefreshExecutor(tableService(), 1, INTERVAL, MAX_PENDING_PARTITIONS); + + // Test healthy table (not need optimizing) - interval should increase + DefaultTableRuntime mockTableRuntime = + getMockTableRuntimeWithAdaptiveInterval(tableRuntime, false, true, INTERVAL); + + // Initial interval is INTERVAL, should increase by 30000 + long expectedInterval = INTERVAL + STEP; + executor.execute(mockTableRuntime); + + // Verify that setLatestRefreshInterval was called with the expected value + verify(mockTableRuntime, Mockito.times(1)).setLatestRefreshInterval(expectedInterval); + + // Test unhealthy table (need optimizing) - interval should decrease + mockTableRuntime = + getMockTableRuntimeWithAdaptiveInterval(tableRuntime, true, true, INTERVAL * 2); + + // Current interval is INTERVAL * 2, should be halved + expectedInterval = (INTERVAL * 2) / 2; + executor.execute(mockTableRuntime); + + // Verify that setLatestRefreshInterval was called with the expected value + verify(mockTableRuntime, Mockito.times(1)).setLatestRefreshInterval(expectedInterval); + + // Test when adaptive interval is disabled + mockTableRuntime = getMockTableRuntimeWithAdaptiveInterval(tableRuntime, false, false, 0); + + executor.execute(mockTableRuntime); + + // Verify that setLatestRefreshInterval was never called + verify(mockTableRuntime, Mockito.never()).setLatestRefreshInterval(Mockito.anyLong()); + + // Test maximum boundary - interval should not exceed MAX_INTERVAL + mockTableRuntime = + getMockTableRuntimeWithAdaptiveInterval(tableRuntime, false, true, MAX_INTERVAL - 1000); + + executor.execute(mockTableRuntime); + + // Verify interval is set to MAX_INTERVAL + verify(mockTableRuntime, Mockito.times(1)).setLatestRefreshInterval(MAX_INTERVAL); + + // Test minimum boundary - interval should not go below INTERVAL + mockTableRuntime = getMockTableRuntimeWithAdaptiveInterval(tableRuntime, true, true, INTERVAL); + + executor.execute(mockTableRuntime); + + // Verify interval remains at INTERVAL (minimum) + verify(mockTableRuntime, Mockito.times(1)).setLatestRefreshInterval(INTERVAL); + + // Test initialization case (currentInterval is 0) + mockTableRuntime = getMockTableRuntimeWithAdaptiveInterval(tableRuntime, false, true, 0); + + // Initial interval is 0, should be initialized to INTERVAL, then increased by 30000 + expectedInterval = INTERVAL + STEP; + executor.execute(mockTableRuntime); + + // Verify interval was correctly initialized and increased + verify(mockTableRuntime, Mockito.times(1)).setLatestRefreshInterval(expectedInterval); + + // Test with different step values + long newStep = 60000L; + mockTableRuntime = + getMockTableRuntimeWithAdaptiveInterval(tableRuntime, false, true, INTERVAL, newStep); + + // Initial interval is INTERVAL, should increase by newStep + expectedInterval = INTERVAL + newStep; + executor.execute(mockTableRuntime); + + // Verify interval increased with correct step factor + verify(mockTableRuntime, Mockito.times(1)).setLatestRefreshInterval(expectedInterval); + + dropTable(); + dropDatabase(); + } + + @Test + public void testGetNextExecutingTime() { + createDatabase(); + createTable(); + + // Test getNextExecutingTime with adaptive interval enabled + DefaultTableRuntime tableRuntime = + (DefaultTableRuntime) tableService().getRuntime(serverTableIdentifier().getId()); + DefaultTableRuntime mockTableRuntime = + getMockTableRuntimeWithAdaptiveInterval(tableRuntime, false, true, MAX_INTERVAL); + + TableRuntimeRefreshExecutor executor = + new TableRuntimeRefreshExecutor(tableService(), 1, INTERVAL, MAX_PENDING_PARTITIONS); + + // Should use adaptive interval rather than default interval + long nextExecutingTime = executor.getNextExecutingTime(mockTableRuntime); + + // Verify returned time equals adaptive interval + Assert.assertEquals(MAX_INTERVAL, nextExecutingTime); + + // Test getNextExecutingTime with adaptive interval disabled + Mockito.when(mockTableRuntime.getOptimizingConfig().getRefreshTableAdaptiveEnabled()) + .thenReturn(false); + nextExecutingTime = executor.getNextExecutingTime(mockTableRuntime); + + // Should use default interval rather than adaptive interval + Assert.assertEquals(INTERVAL, nextExecutingTime); + + // Test getNextExecutingTime with zero adaptive interval (fallback to default) + Mockito.when(mockTableRuntime.getOptimizingConfig().getRefreshTableAdaptiveEnabled()) + .thenReturn(true); + Mockito.when(mockTableRuntime.getLatestRefreshInterval()).thenReturn(0L); + nextExecutingTime = executor.getNextExecutingTime(mockTableRuntime); + + // Should fallback to default interval when adaptive interval is 0 + Assert.assertEquals(INTERVAL, nextExecutingTime); + + dropTable(); + dropDatabase(); + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java index 0c743ac6bd..b2ef9b64dc 100644 --- a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java +++ b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java @@ -97,6 +97,15 @@ public class OptimizingConfig { // self-optimizing.evaluation.fallback-interval private long evaluationFallbackInterval; + // self-optimizing.refresh-table.adaptive.enabled + private boolean refreshTableAdaptiveEnabled; + + // self-optimizing.refresh-table.adaptive.max-interval + private long refreshTableAdaptiveMaxInterval; + + // self-optimizing.refresh-table.adaptive.increase-step + private long refreshTableAdaptiveIncreaseStep; + public OptimizingConfig() {} public boolean isEnabled() { @@ -318,6 +327,34 @@ public OptimizingConfig setEvaluationMseTolerance(long evaluationMseTolerance) { return this; } + public boolean getRefreshTableAdaptiveEnabled() { + return refreshTableAdaptiveEnabled; + } + + public OptimizingConfig setRefreshTableAdaptiveEnabled(boolean refreshTableAdaptiveEnabled) { + this.refreshTableAdaptiveEnabled = refreshTableAdaptiveEnabled; + return this; + } + + public long getRefreshTableAdaptiveMaxInterval() { + return refreshTableAdaptiveMaxInterval; + } + + public OptimizingConfig setRefreshTableAdaptiveMaxInterval(long refreshTableAdaptiveMaxInterval) { + this.refreshTableAdaptiveMaxInterval = refreshTableAdaptiveMaxInterval; + return this; + } + + public long getRefreshTableAdaptiveIncreaseStep() { + return refreshTableAdaptiveIncreaseStep; + } + + public OptimizingConfig setRefreshTableAdaptiveIncreaseStep( + long refreshTableAdaptiveIncreaseStep) { + this.refreshTableAdaptiveIncreaseStep = refreshTableAdaptiveIncreaseStep; + return this; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -350,7 +387,10 @@ public boolean equals(Object o) { && Objects.equal(optimizerGroup, that.optimizerGroup) && Objects.equal(minPlanInterval, that.minPlanInterval) && Objects.equal(evaluationMseTolerance, that.evaluationMseTolerance) - && Objects.equal(evaluationFallbackInterval, that.evaluationFallbackInterval); + && Objects.equal(evaluationFallbackInterval, that.evaluationFallbackInterval) + && refreshTableAdaptiveEnabled == that.refreshTableAdaptiveEnabled + && Objects.equal(refreshTableAdaptiveMaxInterval, that.refreshTableAdaptiveMaxInterval) + && Objects.equal(refreshTableAdaptiveIncreaseStep, that.refreshTableAdaptiveIncreaseStep); } @Override @@ -379,7 +419,10 @@ public int hashCode() { hiveRefreshInterval, minPlanInterval, evaluationMseTolerance, - evaluationFallbackInterval); + evaluationFallbackInterval, + refreshTableAdaptiveEnabled, + refreshTableAdaptiveMaxInterval, + refreshTableAdaptiveIncreaseStep); } @Override @@ -407,6 +450,9 @@ public String toString() { .add("hiveRefreshInterval", hiveRefreshInterval) .add("evaluationMseTolerance", evaluationMseTolerance) .add("evaluationFallbackInterval", evaluationFallbackInterval) + .add("refreshTableAdaptiveEnabled", refreshTableAdaptiveEnabled) + .add("refreshTableAdaptiveMaxInterval", refreshTableAdaptiveMaxInterval) + .add("refreshTableAdaptiveIncreaseStep", refreshTableAdaptiveIncreaseStep) .toString(); } } diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java index 0d804cdec7..763e201482 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java @@ -160,6 +160,18 @@ private TableProperties() {} public static final String SNAPSHOT_MIN_COUNT = "snapshot.keep.min-count"; public static final int SNAPSHOT_MIN_COUNT_DEFAULT = 1; + public static final String SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_ENABLED = + "self-optimizing.refresh-table.adaptive.enabled"; + public static final boolean SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_ENABLED_DEFAULT = false; + public static final String SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL = + "self-optimizing.refresh-table.adaptive.max-interval"; + public static final long SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_DEFAULT = + 3600000; // 1 hour + public static final String SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP = + "self-optimizing.refresh-table.adaptive.increase-step"; + public static final long SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_DEFAULT = + 30000; // 30s + /** * The retention period for snapshots created by Flink checkpoints. Snapshots older than this * duration may be cleaned up. Avoid keeping the last flink checkpoint snapshot for too long, as diff --git a/docs/user-guides/configurations.md b/docs/user-guides/configurations.md index 1895812cea..9ecddd9d22 100644 --- a/docs/user-guides/configurations.md +++ b/docs/user-guides/configurations.md @@ -43,25 +43,28 @@ modified through [Alter Table](../using-tables/#modify-table) operations. Self-optimizing configurations are applicable to both Iceberg Format and Mixed streaming Format. -| Key | Default | Description | -|-----------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| self-optimizing.enabled | true | Enables Self-optimizing | -| self-optimizing.allow-partial-commit | false | Whether to allow partial commit when self-optimizing fails or process is cancelled | -| self-optimizing.group | default | Optimizer group for Self-optimizing | -| self-optimizing.quota | 0.5 | Quota for Self-optimizing, indicating the optimizer resources the table can take up | -| self-optimizing.execute.num-retries | 5 | Number of retries after failure of Self-optimizing | -| self-optimizing.target-size | 134217728(128MB) | Target size for Self-optimizing | -| self-optimizing.max-file-count | 10000 | Maximum number of files processed by a Self-optimizing process | -| self-optimizing.max-task-size-bytes | 134217728(128MB) | Maximum file size bytes in a single task for splitting tasks | -| self-optimizing.fragment-ratio | 8 | The fragment file size threshold. We could divide self-optimizing.target-size by this ratio to get the actual fragment file size | -| self-optimizing.min-target-size-ratio | 0.75 | The undersized segment file size threshold. Segment files under this threshold will be considered for rewriting | -| self-optimizing.minor.trigger.file-count | 12 | The minimum number of files to trigger minor optimizing is determined by the sum of fragment file count and equality delete file count | -| self-optimizing.minor.trigger.interval | 3600000(1 hour) | The time interval in milliseconds to trigger minor optimizing | -| self-optimizing.major.trigger.duplicate-ratio | 0.1 | The ratio of duplicate data of segment files to trigger major optimizing | -| self-optimizing.full.trigger.interval | -1(closed) | The time interval in milliseconds to trigger full optimizing | -| self-optimizing.full.rewrite-all-files | true | Whether full optimizing rewrites all files or skips files that do not need to be optimized | -| self-optimizing.min-plan-interval | 60000 | The minimum time interval between two self-optimizing planning action | -| self-optimizing.filter | NULL | Filter conditions for self-optimizing, using SQL conditional expressions, without supporting any functions. For the timestamp column condition, the ISO date-time formatter must be used. For example: op_time > '2007-12-03T10:15:30'. | +| Key | Default | Description | +|------------------------------------------------------|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| self-optimizing.enabled | true | Enables Self-optimizing | +| self-optimizing.allow-partial-commit | false | Whether to allow partial commit when self-optimizing fails or process is cancelled | +| self-optimizing.group | default | Optimizer group for Self-optimizing | +| self-optimizing.quota | 0.5 | Quota for Self-optimizing, indicating the optimizer resources the table can take up | +| self-optimizing.execute.num-retries | 5 | Number of retries after failure of Self-optimizing | +| self-optimizing.target-size | 134217728(128MB) | Target size for Self-optimizing | +| self-optimizing.max-file-count | 10000 | Maximum number of files processed by a Self-optimizing process | +| self-optimizing.max-task-size-bytes | 134217728(128MB) | Maximum file size bytes in a single task for splitting tasks | +| self-optimizing.fragment-ratio | 8 | The fragment file size threshold. We could divide self-optimizing.target-size by this ratio to get the actual fragment file size | +| self-optimizing.min-target-size-ratio | 0.75 | The undersized segment file size threshold. Segment files under this threshold will be considered for rewriting | +| self-optimizing.minor.trigger.file-count | 12 | The minimum number of files to trigger minor optimizing is determined by the sum of fragment file count and equality delete file count | +| self-optimizing.minor.trigger.interval | 3600000(1 hour) | The time interval in milliseconds to trigger minor optimizing | +| self-optimizing.major.trigger.duplicate-ratio | 0.1 | The ratio of duplicate data of segment files to trigger major optimizing | +| self-optimizing.full.trigger.interval | -1(closed) | The time interval in milliseconds to trigger full optimizing | +| self-optimizing.full.rewrite-all-files | true | Whether full optimizing rewrites all files or skips files that do not need to be optimized | +| self-optimizing.min-plan-interval | 60000 | The minimum time interval between two self-optimizing planning action | +| self-optimizing.filter | NULL | Filter conditions for self-optimizing, using SQL conditional expressions, without supporting any functions. For the timestamp column condition, the ISO date-time formatter must be used. For example: op_time > '2007-12-03T10:15:30'. | +| self-optimizing.refresh-table.adaptive.enabled | false | Whether to enable adaptive refresh interval for refreshing table metadata | +| self-optimizing.refresh-table.adaptive.max-interval | 3600000(1 hour) | The maximum time interval in milliseconds to refresh table metadata | +| self-optimizing.refresh-table.adaptive.increase-step | 30000(30s) | The time interval increase step in milliseconds to refres table metadata | ## Data-cleaning configurations From f17c15269552d2fcfa21c2c1b88e0bdab0516f12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=96=87=E9=A2=86?= Date: Wed, 14 Jan 2026 16:01:05 +0800 Subject: [PATCH 2/5] fixup --- .../inline/TableRuntimeRefreshExecutor.java | 36 ++-- .../server/table/DefaultTableRuntime.java | 3 - .../server/table/TableConfigurations.java | 17 +- .../TestTableRuntimeRefreshExecutor.java | 190 +++++++----------- .../apache/amoro/config/OptimizingConfig.java | 55 ++--- .../apache/amoro/table/TableProperties.java | 17 +- docs/user-guides/configurations.md | 43 ++-- 7 files changed, 145 insertions(+), 216 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java index 709713e5a3..2b9dcbc6ba 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java @@ -31,6 +31,7 @@ import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.table.TableService; import org.apache.amoro.server.utils.IcebergTableUtil; +import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.table.MixedTable; @@ -57,7 +58,7 @@ protected boolean enabled(TableRuntime tableRuntime) { protected long getNextExecutingTime(TableRuntime tableRuntime) { DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime; - if (defaultTableRuntime.getOptimizingConfig().getRefreshTableAdaptiveEnabled()) { + if (defaultTableRuntime.getOptimizingConfig().getRefreshTableAdaptiveMaxIntervalMs() > 0) { long newInterval = defaultTableRuntime.getLatestRefreshInterval(); if (newInterval > 0) { return newInterval; @@ -68,7 +69,7 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) { defaultTableRuntime.getOptimizingConfig().getMinorLeastInterval() * 4L / 5, interval); } - private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTable table) { + private boolean tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTable table) { // only evaluate pending input when optimizing is enabled and in idle state OptimizingConfig optimizingConfig = tableRuntime.getOptimizingConfig(); boolean optimizingEnabled = optimizingConfig.isEnabled(); @@ -77,17 +78,16 @@ private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTa if (optimizingConfig.isMetadataBasedTriggerEnabled() && !MetadataBasedEvaluationEvent.isEvaluatingNecessary( optimizingConfig, table, tableRuntime.getLastPlanTime())) { - tableRuntime.setLatestEvaluatedNeedOptimizing(false); - logger.debug( "{} optimizing is not necessary due to metadata based trigger", tableRuntime.getTableIdentifier()); - return; + return false; } AbstractOptimizingEvaluator evaluator = IcebergTableUtil.createOptimizingEvaluator(tableRuntime, table, maxPendingPartitions); - if (evaluator.isNecessary()) { + boolean evaluatorIsNecessary = evaluator.isNecessary(); + if (evaluatorIsNecessary) { AbstractOptimizingEvaluator.PendingInput pendingInput = evaluator.getOptimizingPendingInput(); logger.debug( @@ -97,17 +97,19 @@ private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTa tableRuntime.setPendingInput(pendingInput); } else { tableRuntime.optimizingNotNecessary(); - tableRuntime.setTableSummary(evaluator.getPendingInput()); } + + tableRuntime.setTableSummary(evaluator.getPendingInput()); + return evaluatorIsNecessary; } else if (!optimizingEnabled) { - tableRuntime.setLatestEvaluatedNeedOptimizing(false); logger.debug( "{} optimizing is not enabled, skip evaluating pending input", tableRuntime.getTableIdentifier()); + return false; } else { - tableRuntime.setLatestEvaluatedNeedOptimizing(true); logger.debug( "{} optimizing is processing or is in preparation", tableRuntime.getTableIdentifier()); + return true; } } @@ -141,20 +143,21 @@ public void execute(TableRuntime tableRuntime) { AmoroTable table = loadTable(tableRuntime); defaultTableRuntime.refresh(table); MixedTable mixedTable = (MixedTable) table.originalTable(); + boolean needOptimizing = false; if ((mixedTable.isKeyedTable() && (lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId() || lastOptimizedChangeSnapshotId != defaultTableRuntime.getCurrentChangeSnapshotId())) || (mixedTable.isUnkeyedTable() && lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId())) { - tryEvaluatingPendingInput(defaultTableRuntime, mixedTable); + needOptimizing = tryEvaluatingPendingInput(defaultTableRuntime, mixedTable); } else { logger.debug("{} optimizing is not necessary", defaultTableRuntime.getTableIdentifier()); - defaultTableRuntime.setLatestEvaluatedNeedOptimizing(false); } - // Update adaptive interval according to evaluating result. - if (defaultTableRuntime.getOptimizingConfig().getRefreshTableAdaptiveEnabled()) { + // Update adaptive interval according to evaluated result. + if (defaultTableRuntime.getOptimizingConfig().getRefreshTableAdaptiveMaxIntervalMs() > 0) { + defaultTableRuntime.setLatestEvaluatedNeedOptimizing(needOptimizing); long newInterval = getAdaptiveExecutingInterval(defaultTableRuntime); defaultTableRuntime.setLatestRefreshInterval(newInterval); } @@ -183,10 +186,11 @@ public void execute(TableRuntime tableRuntime) { * @param tableRuntime The table runtime information containing current status and configuration * @return The next execution interval in milliseconds */ - private long getAdaptiveExecutingInterval(DefaultTableRuntime tableRuntime) { + @VisibleForTesting + public long getAdaptiveExecutingInterval(DefaultTableRuntime tableRuntime) { final long minInterval = interval; final long maxInterval = - tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveMaxInterval(); + tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveMaxIntervalMs(); long currentInterval = tableRuntime.getLatestRefreshInterval(); // Initialize interval on first run or after restart @@ -254,7 +258,7 @@ private long decreaseInterval(long currentInterval, long minInterval) { */ private long increaseInterval( DefaultTableRuntime tableRuntime, long currentInterval, long maxInterval) { - long step = tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveIncreaseStep(); + long step = tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveIncreaseStepMs(); long newInterval = currentInterval + step; long boundedInterval = Math.min(newInterval, maxInterval); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java index 60841207b1..6b294d3bae 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java @@ -306,7 +306,6 @@ public void setPendingInput(AbstractOptimizingEvaluator.PendingInput pendingInpu summary.setTotalFileCount(pendingFileCount); }) .commit(); - this.latestEvaluatedNeedOptimizing = true; } public void setTableSummary(AbstractOptimizingEvaluator.PendingInput tableSummary) { @@ -481,8 +480,6 @@ public void optimizingNotNecessary() { }) .commit(); } - - this.latestEvaluatedNeedOptimizing = false; } public void beginCommitting() { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java index 44253368c8..1601166db4 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java @@ -342,21 +342,16 @@ public static OptimizingConfig parseOptimizingConfig(Map propert properties, TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE, TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT)) - .setRefreshTableAdaptiveEnabled( - PropertyUtil.propertyAsBoolean( - properties, - TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_ENABLED, - TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_ENABLED_DEFAULT)) - .setRefreshTableAdaptiveMaxInterval( + .setRefreshTableAdaptiveMaxIntervalMs( PropertyUtil.propertyAsLong( properties, - TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL, - TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_DEFAULT)) - .setRefreshTableAdaptiveIncreaseStep( + TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS, + TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS_DEFAULT)) + .setRefreshTableAdaptiveIncreaseStepMs( PropertyUtil.propertyAsLong( properties, - TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP, - TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_DEFAULT)); + TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS, + TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS_DEFAULT)); } /** diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java index bbb64c3aa2..c2818cc5e1 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java @@ -18,8 +18,6 @@ package org.apache.amoro.server.scheduler.inline; -import static org.mockito.Mockito.verify; - import org.apache.amoro.BasicTableTestHelper; import org.apache.amoro.TableFormat; import org.apache.amoro.TableTestHelper; @@ -34,7 +32,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.mockito.Mockito; @RunWith(Parameterized.class) public class TestTableRuntimeRefreshExecutor extends AMSTableTestBase { @@ -83,53 +80,48 @@ public TestTableRuntimeRefreshExecutor( private static final long INTERVAL = 60000L; // 1 minute private static final long MAX_INTERVAL = 300000L; // 5 minutes + private static final long DEFAULT_MAX_INTERVAL = -1L; private static final long STEP = 30000L; // 30s private static final int MAX_PENDING_PARTITIONS = 1; private static final int MINOR_LEAST_INTERVAL = 3600000; // 1h - private static final long INITIAL_SNAPSHOTID = 0L; - // Create mock DefaultTableRuntime for adaptive interval tests - private DefaultTableRuntime getMockTableRuntimeWithAdaptiveInterval( - DefaultTableRuntime tableRuntime, - boolean needOptimizing, - boolean adaptiveEnabled, - long currentInterval, - long step) { - DefaultTableRuntime mockTableRuntime = Mockito.mock(DefaultTableRuntime.class); - Mockito.when(mockTableRuntime.getTableIdentifier()) - .thenReturn(tableRuntime.getTableIdentifier()); - Mockito.when(mockTableRuntime.getLastOptimizedSnapshotId()) - .thenReturn(tableRuntime.getLastOptimizedSnapshotId()); - Mockito.when(mockTableRuntime.getLastOptimizedChangeSnapshotId()) - .thenReturn(tableRuntime.getLastOptimizedChangeSnapshotId()); - Mockito.when(mockTableRuntime.getCurrentSnapshotId()).thenReturn(INITIAL_SNAPSHOTID); - Mockito.when(mockTableRuntime.getCurrentChangeSnapshotId()).thenReturn(INITIAL_SNAPSHOTID); - Mockito.when(mockTableRuntime.getLatestRefreshInterval()).thenReturn(currentInterval); - Mockito.when(mockTableRuntime.getLatestEvaluatedNeedOptimizing()).thenReturn(needOptimizing); - - OptimizingConfig mockOptimizingConfig = Mockito.mock(OptimizingConfig.class); - Mockito.when(mockOptimizingConfig.getRefreshTableAdaptiveEnabled()).thenReturn(adaptiveEnabled); - Mockito.when(mockOptimizingConfig.getRefreshTableAdaptiveMaxInterval()) - .thenReturn(MAX_INTERVAL); - Mockito.when(mockOptimizingConfig.getMinorLeastInterval()).thenReturn(MINOR_LEAST_INTERVAL); - Mockito.when(mockOptimizingConfig.getRefreshTableAdaptiveIncreaseStep()).thenReturn(step); - Mockito.when(mockTableRuntime.getOptimizingConfig()).thenReturn(mockOptimizingConfig); - - return mockTableRuntime; + // Create DefaultTableRuntime for adaptive interval tests + private DefaultTableRuntime buildTableRuntimeWithAdaptiveRefresh( + DefaultTableRuntime tableRuntime, boolean needOptimizing, long interval, long step) { + return buildTableRuntimeWithConfig(tableRuntime, needOptimizing, interval, MAX_INTERVAL, step); } - // Overloaded method with default step factor - private DefaultTableRuntime getMockTableRuntimeWithAdaptiveInterval( + private DefaultTableRuntime buildTableRuntimeWithAdaptiveMaxInterval( + DefaultTableRuntime tableRuntime, long interval, long maxInterval) { + return buildTableRuntimeWithConfig(tableRuntime, true, interval, maxInterval, STEP); + } + + private DefaultTableRuntime buildTableRuntimeWithConfig( DefaultTableRuntime tableRuntime, boolean needOptimizing, - boolean adaptiveEnabled, - long currentInterval) { - return getMockTableRuntimeWithAdaptiveInterval( - tableRuntime, needOptimizing, adaptiveEnabled, currentInterval, STEP); + long interval, + long maxInterval, + long step) { + OptimizingConfig optimizingConfig = new OptimizingConfig(); + optimizingConfig.setRefreshTableAdaptiveMaxIntervalMs(maxInterval); + optimizingConfig.setMinorLeastInterval(MINOR_LEAST_INTERVAL); + optimizingConfig.setRefreshTableAdaptiveIncreaseStepMs(step); + + DefaultTableRuntime newRuntime = + new DefaultTableRuntime(tableRuntime.store()) { + @Override + public OptimizingConfig getOptimizingConfig() { + return optimizingConfig; + } + }; + newRuntime.setLatestEvaluatedNeedOptimizing(needOptimizing); + newRuntime.setLatestRefreshInterval(interval); + + return newRuntime; } @Test - public void testAdaptiveIntervalScenarios() { + public void testAdaptiveRefreshIntervalScenarios() { createDatabase(); createTable(); @@ -139,73 +131,37 @@ public void testAdaptiveIntervalScenarios() { new TableRuntimeRefreshExecutor(tableService(), 1, INTERVAL, MAX_PENDING_PARTITIONS); // Test healthy table (not need optimizing) - interval should increase - DefaultTableRuntime mockTableRuntime = - getMockTableRuntimeWithAdaptiveInterval(tableRuntime, false, true, INTERVAL); - - // Initial interval is INTERVAL, should increase by 30000 + DefaultTableRuntime newTableRuntime = + buildTableRuntimeWithAdaptiveRefresh(tableRuntime, false, 0, STEP); + long adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(newTableRuntime); long expectedInterval = INTERVAL + STEP; - executor.execute(mockTableRuntime); - - // Verify that setLatestRefreshInterval was called with the expected value - verify(mockTableRuntime, Mockito.times(1)).setLatestRefreshInterval(expectedInterval); - - // Test unhealthy table (need optimizing) - interval should decrease - mockTableRuntime = - getMockTableRuntimeWithAdaptiveInterval(tableRuntime, true, true, INTERVAL * 2); - - // Current interval is INTERVAL * 2, should be halved - expectedInterval = (INTERVAL * 2) / 2; - executor.execute(mockTableRuntime); - - // Verify that setLatestRefreshInterval was called with the expected value - verify(mockTableRuntime, Mockito.times(1)).setLatestRefreshInterval(expectedInterval); - - // Test when adaptive interval is disabled - mockTableRuntime = getMockTableRuntimeWithAdaptiveInterval(tableRuntime, false, false, 0); - - executor.execute(mockTableRuntime); - - // Verify that setLatestRefreshInterval was never called - verify(mockTableRuntime, Mockito.never()).setLatestRefreshInterval(Mockito.anyLong()); + Assert.assertEquals(expectedInterval, adaptiveExecutingInterval); + + // Test minimum boundary - interval should not below INTERVAL + // The unhealthy table (need optimizing) - interval should decrease half + // The currentInterval is INTERVAL + STEP, the latest interval is INTERVAL not (INTERVAL + STEP) + // / 2 + newTableRuntime = + buildTableRuntimeWithAdaptiveRefresh( + newTableRuntime, true, adaptiveExecutingInterval, STEP); + adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(newTableRuntime); + expectedInterval = INTERVAL; + Assert.assertEquals(expectedInterval, adaptiveExecutingInterval); + + // Test healthy table (not need optimizing) with different step values + newTableRuntime = + buildTableRuntimeWithAdaptiveRefresh( + newTableRuntime, false, adaptiveExecutingInterval, 8 * STEP); + adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(newTableRuntime); + expectedInterval = expectedInterval + 8 * STEP; + Assert.assertEquals(expectedInterval, adaptiveExecutingInterval); // Test maximum boundary - interval should not exceed MAX_INTERVAL - mockTableRuntime = - getMockTableRuntimeWithAdaptiveInterval(tableRuntime, false, true, MAX_INTERVAL - 1000); - - executor.execute(mockTableRuntime); - - // Verify interval is set to MAX_INTERVAL - verify(mockTableRuntime, Mockito.times(1)).setLatestRefreshInterval(MAX_INTERVAL); - - // Test minimum boundary - interval should not go below INTERVAL - mockTableRuntime = getMockTableRuntimeWithAdaptiveInterval(tableRuntime, true, true, INTERVAL); - - executor.execute(mockTableRuntime); - - // Verify interval remains at INTERVAL (minimum) - verify(mockTableRuntime, Mockito.times(1)).setLatestRefreshInterval(INTERVAL); - - // Test initialization case (currentInterval is 0) - mockTableRuntime = getMockTableRuntimeWithAdaptiveInterval(tableRuntime, false, true, 0); - - // Initial interval is 0, should be initialized to INTERVAL, then increased by 30000 - expectedInterval = INTERVAL + STEP; - executor.execute(mockTableRuntime); - - // Verify interval was correctly initialized and increased - verify(mockTableRuntime, Mockito.times(1)).setLatestRefreshInterval(expectedInterval); - - // Test with different step values - long newStep = 60000L; - mockTableRuntime = - getMockTableRuntimeWithAdaptiveInterval(tableRuntime, false, true, INTERVAL, newStep); - - // Initial interval is INTERVAL, should increase by newStep - expectedInterval = INTERVAL + newStep; - executor.execute(mockTableRuntime); - - // Verify interval increased with correct step factor - verify(mockTableRuntime, Mockito.times(1)).setLatestRefreshInterval(expectedInterval); + newTableRuntime = + buildTableRuntimeWithAdaptiveRefresh( + newTableRuntime, false, adaptiveExecutingInterval, STEP); + adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(newTableRuntime); + Assert.assertEquals(MAX_INTERVAL, adaptiveExecutingInterval); dropTable(); dropDatabase(); @@ -216,36 +172,30 @@ public void testGetNextExecutingTime() { createDatabase(); createTable(); - // Test getNextExecutingTime with adaptive interval enabled DefaultTableRuntime tableRuntime = (DefaultTableRuntime) tableService().getRuntime(serverTableIdentifier().getId()); - DefaultTableRuntime mockTableRuntime = - getMockTableRuntimeWithAdaptiveInterval(tableRuntime, false, true, MAX_INTERVAL); - TableRuntimeRefreshExecutor executor = new TableRuntimeRefreshExecutor(tableService(), 1, INTERVAL, MAX_PENDING_PARTITIONS); + // Test getNextExecutingTime with adaptive interval enabled + DefaultTableRuntime newTableRuntime = + buildTableRuntimeWithAdaptiveMaxInterval(tableRuntime, MAX_INTERVAL, MAX_INTERVAL); // Should use adaptive interval rather than default interval - long nextExecutingTime = executor.getNextExecutingTime(mockTableRuntime); - - // Verify returned time equals adaptive interval + long nextExecutingTime = executor.getNextExecutingTime(newTableRuntime); Assert.assertEquals(MAX_INTERVAL, nextExecutingTime); // Test getNextExecutingTime with adaptive interval disabled - Mockito.when(mockTableRuntime.getOptimizingConfig().getRefreshTableAdaptiveEnabled()) - .thenReturn(false); - nextExecutingTime = executor.getNextExecutingTime(mockTableRuntime); - + newTableRuntime = + buildTableRuntimeWithAdaptiveMaxInterval( + newTableRuntime, MAX_INTERVAL, DEFAULT_MAX_INTERVAL); // Should use default interval rather than adaptive interval + nextExecutingTime = executor.getNextExecutingTime(newTableRuntime); Assert.assertEquals(INTERVAL, nextExecutingTime); - // Test getNextExecutingTime with zero adaptive interval (fallback to default) - Mockito.when(mockTableRuntime.getOptimizingConfig().getRefreshTableAdaptiveEnabled()) - .thenReturn(true); - Mockito.when(mockTableRuntime.getLatestRefreshInterval()).thenReturn(0L); - nextExecutingTime = executor.getNextExecutingTime(mockTableRuntime); - + // Test getNextExecutingTime with zero adaptive interval + newTableRuntime = buildTableRuntimeWithAdaptiveMaxInterval(newTableRuntime, 0, MAX_INTERVAL); // Should fallback to default interval when adaptive interval is 0 + nextExecutingTime = executor.getNextExecutingTime(newTableRuntime); Assert.assertEquals(INTERVAL, nextExecutingTime); dropTable(); diff --git a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java index b2ef9b64dc..715f749b3c 100644 --- a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java +++ b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java @@ -97,14 +97,11 @@ public class OptimizingConfig { // self-optimizing.evaluation.fallback-interval private long evaluationFallbackInterval; - // self-optimizing.refresh-table.adaptive.enabled - private boolean refreshTableAdaptiveEnabled; + // self-optimizing.refresh-table.adaptive.max-interval-ms + private long refreshTableAdaptiveMaxIntervalMs; - // self-optimizing.refresh-table.adaptive.max-interval - private long refreshTableAdaptiveMaxInterval; - - // self-optimizing.refresh-table.adaptive.increase-step - private long refreshTableAdaptiveIncreaseStep; + // self-optimizing.refresh-table.adaptive.increase-step-ms + private long refreshTableAdaptiveIncreaseStepMs; public OptimizingConfig() {} @@ -327,31 +324,23 @@ public OptimizingConfig setEvaluationMseTolerance(long evaluationMseTolerance) { return this; } - public boolean getRefreshTableAdaptiveEnabled() { - return refreshTableAdaptiveEnabled; - } - - public OptimizingConfig setRefreshTableAdaptiveEnabled(boolean refreshTableAdaptiveEnabled) { - this.refreshTableAdaptiveEnabled = refreshTableAdaptiveEnabled; - return this; - } - - public long getRefreshTableAdaptiveMaxInterval() { - return refreshTableAdaptiveMaxInterval; + public long getRefreshTableAdaptiveMaxIntervalMs() { + return refreshTableAdaptiveMaxIntervalMs; } - public OptimizingConfig setRefreshTableAdaptiveMaxInterval(long refreshTableAdaptiveMaxInterval) { - this.refreshTableAdaptiveMaxInterval = refreshTableAdaptiveMaxInterval; + public OptimizingConfig setRefreshTableAdaptiveMaxIntervalMs( + long refreshTableAdaptiveMaxIntervalMs) { + this.refreshTableAdaptiveMaxIntervalMs = refreshTableAdaptiveMaxIntervalMs; return this; } - public long getRefreshTableAdaptiveIncreaseStep() { - return refreshTableAdaptiveIncreaseStep; + public long getRefreshTableAdaptiveIncreaseStepMs() { + return refreshTableAdaptiveIncreaseStepMs; } - public OptimizingConfig setRefreshTableAdaptiveIncreaseStep( - long refreshTableAdaptiveIncreaseStep) { - this.refreshTableAdaptiveIncreaseStep = refreshTableAdaptiveIncreaseStep; + public OptimizingConfig setRefreshTableAdaptiveIncreaseStepMs( + long refreshTableAdaptiveIncreaseStepMs) { + this.refreshTableAdaptiveIncreaseStepMs = refreshTableAdaptiveIncreaseStepMs; return this; } @@ -388,9 +377,9 @@ public boolean equals(Object o) { && Objects.equal(minPlanInterval, that.minPlanInterval) && Objects.equal(evaluationMseTolerance, that.evaluationMseTolerance) && Objects.equal(evaluationFallbackInterval, that.evaluationFallbackInterval) - && refreshTableAdaptiveEnabled == that.refreshTableAdaptiveEnabled - && Objects.equal(refreshTableAdaptiveMaxInterval, that.refreshTableAdaptiveMaxInterval) - && Objects.equal(refreshTableAdaptiveIncreaseStep, that.refreshTableAdaptiveIncreaseStep); + && Objects.equal(refreshTableAdaptiveMaxIntervalMs, that.refreshTableAdaptiveMaxIntervalMs) + && Objects.equal( + refreshTableAdaptiveIncreaseStepMs, that.refreshTableAdaptiveIncreaseStepMs); } @Override @@ -420,9 +409,8 @@ public int hashCode() { minPlanInterval, evaluationMseTolerance, evaluationFallbackInterval, - refreshTableAdaptiveEnabled, - refreshTableAdaptiveMaxInterval, - refreshTableAdaptiveIncreaseStep); + refreshTableAdaptiveMaxIntervalMs, + refreshTableAdaptiveIncreaseStepMs); } @Override @@ -450,9 +438,8 @@ public String toString() { .add("hiveRefreshInterval", hiveRefreshInterval) .add("evaluationMseTolerance", evaluationMseTolerance) .add("evaluationFallbackInterval", evaluationFallbackInterval) - .add("refreshTableAdaptiveEnabled", refreshTableAdaptiveEnabled) - .add("refreshTableAdaptiveMaxInterval", refreshTableAdaptiveMaxInterval) - .add("refreshTableAdaptiveIncreaseStep", refreshTableAdaptiveIncreaseStep) + .add("refreshTableAdaptiveMaxIntervalMs", refreshTableAdaptiveMaxIntervalMs) + .add("refreshTableAdaptiveIncreaseStepMs", refreshTableAdaptiveIncreaseStepMs) .toString(); } } diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java index 763e201482..97f7e4a13e 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java @@ -160,16 +160,13 @@ private TableProperties() {} public static final String SNAPSHOT_MIN_COUNT = "snapshot.keep.min-count"; public static final int SNAPSHOT_MIN_COUNT_DEFAULT = 1; - public static final String SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_ENABLED = - "self-optimizing.refresh-table.adaptive.enabled"; - public static final boolean SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_ENABLED_DEFAULT = false; - public static final String SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL = - "self-optimizing.refresh-table.adaptive.max-interval"; - public static final long SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_DEFAULT = - 3600000; // 1 hour - public static final String SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP = - "self-optimizing.refresh-table.adaptive.increase-step"; - public static final long SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_DEFAULT = + public static final String SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS = + "self-optimizing.refresh-table.adaptive.max-interval-ms"; + public static final long SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS_DEFAULT = + -1; // disabled + public static final String SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS = + "self-optimizing.refresh-table.adaptive.increase-step-ms"; + public static final long SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS_DEFAULT = 30000; // 30s /** diff --git a/docs/user-guides/configurations.md b/docs/user-guides/configurations.md index 9ecddd9d22..9e92faec0c 100644 --- a/docs/user-guides/configurations.md +++ b/docs/user-guides/configurations.md @@ -43,28 +43,27 @@ modified through [Alter Table](../using-tables/#modify-table) operations. Self-optimizing configurations are applicable to both Iceberg Format and Mixed streaming Format. -| Key | Default | Description | -|------------------------------------------------------|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| self-optimizing.enabled | true | Enables Self-optimizing | -| self-optimizing.allow-partial-commit | false | Whether to allow partial commit when self-optimizing fails or process is cancelled | -| self-optimizing.group | default | Optimizer group for Self-optimizing | -| self-optimizing.quota | 0.5 | Quota for Self-optimizing, indicating the optimizer resources the table can take up | -| self-optimizing.execute.num-retries | 5 | Number of retries after failure of Self-optimizing | -| self-optimizing.target-size | 134217728(128MB) | Target size for Self-optimizing | -| self-optimizing.max-file-count | 10000 | Maximum number of files processed by a Self-optimizing process | -| self-optimizing.max-task-size-bytes | 134217728(128MB) | Maximum file size bytes in a single task for splitting tasks | -| self-optimizing.fragment-ratio | 8 | The fragment file size threshold. We could divide self-optimizing.target-size by this ratio to get the actual fragment file size | -| self-optimizing.min-target-size-ratio | 0.75 | The undersized segment file size threshold. Segment files under this threshold will be considered for rewriting | -| self-optimizing.minor.trigger.file-count | 12 | The minimum number of files to trigger minor optimizing is determined by the sum of fragment file count and equality delete file count | -| self-optimizing.minor.trigger.interval | 3600000(1 hour) | The time interval in milliseconds to trigger minor optimizing | -| self-optimizing.major.trigger.duplicate-ratio | 0.1 | The ratio of duplicate data of segment files to trigger major optimizing | -| self-optimizing.full.trigger.interval | -1(closed) | The time interval in milliseconds to trigger full optimizing | -| self-optimizing.full.rewrite-all-files | true | Whether full optimizing rewrites all files or skips files that do not need to be optimized | -| self-optimizing.min-plan-interval | 60000 | The minimum time interval between two self-optimizing planning action | -| self-optimizing.filter | NULL | Filter conditions for self-optimizing, using SQL conditional expressions, without supporting any functions. For the timestamp column condition, the ISO date-time formatter must be used. For example: op_time > '2007-12-03T10:15:30'. | -| self-optimizing.refresh-table.adaptive.enabled | false | Whether to enable adaptive refresh interval for refreshing table metadata | -| self-optimizing.refresh-table.adaptive.max-interval | 3600000(1 hour) | The maximum time interval in milliseconds to refresh table metadata | -| self-optimizing.refresh-table.adaptive.increase-step | 30000(30s) | The time interval increase step in milliseconds to refres table metadata | +| Key | Default | Description | +|---------------------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| self-optimizing.enabled | true | Enables Self-optimizing | +| self-optimizing.allow-partial-commit | false | Whether to allow partial commit when self-optimizing fails or process is cancelled | +| self-optimizing.group | default | Optimizer group for Self-optimizing | +| self-optimizing.quota | 0.5 | Quota for Self-optimizing, indicating the optimizer resources the table can take up | +| self-optimizing.execute.num-retries | 5 | Number of retries after failure of Self-optimizing | +| self-optimizing.target-size | 134217728(128MB) | Target size for Self-optimizing | +| self-optimizing.max-file-count | 10000 | Maximum number of files processed by a Self-optimizing process | +| self-optimizing.max-task-size-bytes | 134217728(128MB) | Maximum file size bytes in a single task for splitting tasks | +| self-optimizing.fragment-ratio | 8 | The fragment file size threshold. We could divide self-optimizing.target-size by this ratio to get the actual fragment file size | +| self-optimizing.min-target-size-ratio | 0.75 | The undersized segment file size threshold. Segment files under this threshold will be considered for rewriting | +| self-optimizing.minor.trigger.file-count | 12 | The minimum number of files to trigger minor optimizing is determined by the sum of fragment file count and equality delete file count | +| self-optimizing.minor.trigger.interval | 3600000(1 hour) | The time interval in milliseconds to trigger minor optimizing | +| self-optimizing.major.trigger.duplicate-ratio | 0.1 | The ratio of duplicate data of segment files to trigger major optimizing | +| self-optimizing.full.trigger.interval | -1(closed) | The time interval in milliseconds to trigger full optimizing | +| self-optimizing.full.rewrite-all-files | true | Whether full optimizing rewrites all files or skips files that do not need to be optimized | +| self-optimizing.min-plan-interval | 60000 | The minimum time interval between two self-optimizing planning action | +| self-optimizing.filter | NULL | Filter conditions for self-optimizing, using SQL conditional expressions, without supporting any functions. For the timestamp column condition, the ISO date-time formatter must be used. For example: op_time > '2007-12-03T10:15:30'. | +| self-optimizing.refresh-table.adaptive.max-interval-ms | -1 | The maximum time interval in milliseconds to refresh table metadata. The default value is -1, which disables the adaptive refresh. | +| self-optimizing.refresh-table.adaptive.increase-step-ms | 30000(30s) | The time interval increase step in milliseconds to refresh table metadata | ## Data-cleaning configurations From b9877f19d2f67907d06d77ab3718601ae9310e8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=96=87=E9=A2=86?= Date: Tue, 3 Feb 2026 16:10:56 +0800 Subject: [PATCH 3/5] fixup --- .../inline/TableRuntimeRefreshExecutor.java | 18 +- .../TestTableRuntimeRefreshExecutor.java | 155 +++++++++--------- .../apache/amoro/config/OptimizingConfig.java | 8 + .../apache/amoro/table/TableProperties.java | 2 +- docs/user-guides/configurations.md | 40 ++--- 5 files changed, 120 insertions(+), 103 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java index 2b9dcbc6ba..b6b7e4ba63 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java @@ -58,7 +58,7 @@ protected boolean enabled(TableRuntime tableRuntime) { protected long getNextExecutingTime(TableRuntime tableRuntime) { DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime; - if (defaultTableRuntime.getOptimizingConfig().getRefreshTableAdaptiveMaxIntervalMs() > 0) { + if (defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled(interval)) { long newInterval = defaultTableRuntime.getLatestRefreshInterval(); if (newInterval > 0) { return newInterval; @@ -81,6 +81,7 @@ private boolean tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, Mixe logger.debug( "{} optimizing is not necessary due to metadata based trigger", tableRuntime.getTableIdentifier()); + // indicates no optimization demand now return false; } @@ -105,10 +106,13 @@ private boolean tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, Mixe logger.debug( "{} optimizing is not enabled, skip evaluating pending input", tableRuntime.getTableIdentifier()); + // indicates no optimization demand now return false; } else { logger.debug( "{} optimizing is processing or is in preparation", tableRuntime.getTableIdentifier()); + // indicates optimization demand exists (preparation or processing), + // even though we don't trigger a new evaluation in this loop. return true; } } @@ -143,25 +147,25 @@ public void execute(TableRuntime tableRuntime) { AmoroTable table = loadTable(tableRuntime); defaultTableRuntime.refresh(table); MixedTable mixedTable = (MixedTable) table.originalTable(); - boolean needOptimizing = false; + // Check if there is any optimizing demand now. + boolean hasOptimizingDemand = false; if ((mixedTable.isKeyedTable() && (lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId() || lastOptimizedChangeSnapshotId != defaultTableRuntime.getCurrentChangeSnapshotId())) || (mixedTable.isUnkeyedTable() && lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId())) { - needOptimizing = tryEvaluatingPendingInput(defaultTableRuntime, mixedTable); + hasOptimizingDemand = tryEvaluatingPendingInput(defaultTableRuntime, mixedTable); } else { logger.debug("{} optimizing is not necessary", defaultTableRuntime.getTableIdentifier()); } // Update adaptive interval according to evaluated result. - if (defaultTableRuntime.getOptimizingConfig().getRefreshTableAdaptiveMaxIntervalMs() > 0) { - defaultTableRuntime.setLatestEvaluatedNeedOptimizing(needOptimizing); + if (defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled(interval)) { + defaultTableRuntime.setLatestEvaluatedNeedOptimizing(hasOptimizingDemand); long newInterval = getAdaptiveExecutingInterval(defaultTableRuntime); defaultTableRuntime.setLatestRefreshInterval(newInterval); } - } catch (Throwable throwable) { logger.error("Refreshing table {} failed.", tableRuntime.getTableIdentifier(), throwable); } @@ -234,7 +238,6 @@ public long getAdaptiveExecutingInterval(DefaultTableRuntime tableRuntime) { private long decreaseInterval(long currentInterval, long minInterval) { long newInterval = currentInterval / 2; long boundedInterval = Math.max(newInterval, minInterval); - if (newInterval < minInterval) { logger.debug( "Interval reached minimum boundary: attempted {}ms, capped at {}ms", @@ -261,7 +264,6 @@ private long increaseInterval( long step = tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveIncreaseStepMs(); long newInterval = currentInterval + step; long boundedInterval = Math.min(newInterval, maxInterval); - if (newInterval > maxInterval) { logger.debug( "Interval reached maximum boundary: currentInterval is {}ms, attempted {}ms, capped at {}ms", diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java index c2818cc5e1..0c0c09e2b3 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java @@ -28,6 +28,7 @@ import org.apache.amoro.hive.catalog.HiveTableTestHelper; import org.apache.amoro.server.table.AMSTableTestBase; import org.apache.amoro.server.table.DefaultTableRuntime; +import org.apache.amoro.table.TableRuntimeStore; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -80,44 +81,40 @@ public TestTableRuntimeRefreshExecutor( private static final long INTERVAL = 60000L; // 1 minute private static final long MAX_INTERVAL = 300000L; // 5 minutes - private static final long DEFAULT_MAX_INTERVAL = -1L; private static final long STEP = 30000L; // 30s private static final int MAX_PENDING_PARTITIONS = 1; private static final int MINOR_LEAST_INTERVAL = 3600000; // 1h - // Create DefaultTableRuntime for adaptive interval tests - private DefaultTableRuntime buildTableRuntimeWithAdaptiveRefresh( - DefaultTableRuntime tableRuntime, boolean needOptimizing, long interval, long step) { - return buildTableRuntimeWithConfig(tableRuntime, needOptimizing, interval, MAX_INTERVAL, step); + /** + * A test helper class that allows configuration updates. Reuses the same TableRuntime instance + * across different test scenarios. + */ + private static class TestTableRuntime extends DefaultTableRuntime { + private OptimizingConfig testOptimizingConfig; + + TestTableRuntime(TableRuntimeStore store, OptimizingConfig optimizingConfig) { + super(store); + this.testOptimizingConfig = optimizingConfig; + } + + /** Update the optimizing config without creating a new instance */ + void updateOptimizingConfig(OptimizingConfig newConfig) { + this.testOptimizingConfig = newConfig; + } + + @Override + public OptimizingConfig getOptimizingConfig() { + return this.testOptimizingConfig; + } } - private DefaultTableRuntime buildTableRuntimeWithAdaptiveMaxInterval( - DefaultTableRuntime tableRuntime, long interval, long maxInterval) { - return buildTableRuntimeWithConfig(tableRuntime, true, interval, maxInterval, STEP); - } - - private DefaultTableRuntime buildTableRuntimeWithConfig( - DefaultTableRuntime tableRuntime, - boolean needOptimizing, - long interval, - long maxInterval, - long step) { - OptimizingConfig optimizingConfig = new OptimizingConfig(); - optimizingConfig.setRefreshTableAdaptiveMaxIntervalMs(maxInterval); - optimizingConfig.setMinorLeastInterval(MINOR_LEAST_INTERVAL); - optimizingConfig.setRefreshTableAdaptiveIncreaseStepMs(step); - - DefaultTableRuntime newRuntime = - new DefaultTableRuntime(tableRuntime.store()) { - @Override - public OptimizingConfig getOptimizingConfig() { - return optimizingConfig; - } - }; - newRuntime.setLatestEvaluatedNeedOptimizing(needOptimizing); - newRuntime.setLatestRefreshInterval(interval); - - return newRuntime; + /** Create OptimizingConfig with specified parameters */ + private OptimizingConfig createOptimizingConfig(long maxInterval, long step) { + OptimizingConfig config = new OptimizingConfig(); + config.setRefreshTableAdaptiveMaxIntervalMs(maxInterval); + config.setMinorLeastInterval(MINOR_LEAST_INTERVAL); + config.setRefreshTableAdaptiveIncreaseStepMs(step); + return config; } @Test @@ -125,42 +122,48 @@ public void testAdaptiveRefreshIntervalScenarios() { createDatabase(); createTable(); - DefaultTableRuntime tableRuntime = + // Get the original table runtime + DefaultTableRuntime baseRuntime = (DefaultTableRuntime) tableService().getRuntime(serverTableIdentifier().getId()); TableRuntimeRefreshExecutor executor = new TableRuntimeRefreshExecutor(tableService(), 1, INTERVAL, MAX_PENDING_PARTITIONS); - // Test healthy table (not need optimizing) - interval should increase - DefaultTableRuntime newTableRuntime = - buildTableRuntimeWithAdaptiveRefresh(tableRuntime, false, 0, STEP); - long adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(newTableRuntime); + // Create a tableRuntime instance with initial config + OptimizingConfig initialConfig = createOptimizingConfig(MAX_INTERVAL, STEP); + TestTableRuntime tableRuntime = new TestTableRuntime(baseRuntime.store(), initialConfig); + + // Test 1: Healthy table (not need optimizing) - interval should increase by STEP + // Initial state: needOptimizing=false, latestRefreshInterval=0 (will use default INTERVAL) + tableRuntime.setLatestEvaluatedNeedOptimizing(false); + tableRuntime.setLatestRefreshInterval(0); + long adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(tableRuntime); long expectedInterval = INTERVAL + STEP; Assert.assertEquals(expectedInterval, adaptiveExecutingInterval); - // Test minimum boundary - interval should not below INTERVAL - // The unhealthy table (need optimizing) - interval should decrease half - // The currentInterval is INTERVAL + STEP, the latest interval is INTERVAL not (INTERVAL + STEP) - // / 2 - newTableRuntime = - buildTableRuntimeWithAdaptiveRefresh( - newTableRuntime, true, adaptiveExecutingInterval, STEP); - adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(newTableRuntime); + // Test 2: Test minimum boundary - interval should not below INTERVAL + // Unhealthy table (need optimizing) - interval should decrease to half + // current interval is INTERVAL + STEP, the latest interval is INTERVAL + // not (INTERVAL + STEP) / 2 + tableRuntime.setLatestEvaluatedNeedOptimizing(true); + tableRuntime.setLatestRefreshInterval(adaptiveExecutingInterval); + adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(tableRuntime); expectedInterval = INTERVAL; Assert.assertEquals(expectedInterval, adaptiveExecutingInterval); - // Test healthy table (not need optimizing) with different step values - newTableRuntime = - buildTableRuntimeWithAdaptiveRefresh( - newTableRuntime, false, adaptiveExecutingInterval, 8 * STEP); - adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(newTableRuntime); + // Test 3: Healthy table with larger step value - interval should increase by 8 * STEP + tableRuntime.setLatestEvaluatedNeedOptimizing(false); + tableRuntime.setLatestRefreshInterval(adaptiveExecutingInterval); + tableRuntime.updateOptimizingConfig(createOptimizingConfig(MAX_INTERVAL, 8 * STEP)); + adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(tableRuntime); expectedInterval = expectedInterval + 8 * STEP; Assert.assertEquals(expectedInterval, adaptiveExecutingInterval); - // Test maximum boundary - interval should not exceed MAX_INTERVAL - newTableRuntime = - buildTableRuntimeWithAdaptiveRefresh( - newTableRuntime, false, adaptiveExecutingInterval, STEP); - adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(newTableRuntime); + // Test 4: Maximum boundary - interval should not exceed MAX_INTERVAL + tableRuntime.setLatestRefreshInterval(adaptiveExecutingInterval); + tableRuntime.updateOptimizingConfig(createOptimizingConfig(MAX_INTERVAL, STEP)); + // current interval is INTERVAL + 8 * STEP, the latest interval is MAX_INTERVAL + // rather than INTERVAL + 9 * STEP + adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(tableRuntime); Assert.assertEquals(MAX_INTERVAL, adaptiveExecutingInterval); dropTable(); @@ -172,31 +175,35 @@ public void testGetNextExecutingTime() { createDatabase(); createTable(); - DefaultTableRuntime tableRuntime = + // Get the original table runtime + DefaultTableRuntime baseRuntime = (DefaultTableRuntime) tableService().getRuntime(serverTableIdentifier().getId()); TableRuntimeRefreshExecutor executor = new TableRuntimeRefreshExecutor(tableService(), 1, INTERVAL, MAX_PENDING_PARTITIONS); - // Test getNextExecutingTime with adaptive interval enabled - DefaultTableRuntime newTableRuntime = - buildTableRuntimeWithAdaptiveMaxInterval(tableRuntime, MAX_INTERVAL, MAX_INTERVAL); - // Should use adaptive interval rather than default interval - long nextExecutingTime = executor.getNextExecutingTime(newTableRuntime); + // Create a tableRuntime instance with adaptive interval enabled + OptimizingConfig configWithAdaptiveEnabled = createOptimizingConfig(MAX_INTERVAL, STEP); + TestTableRuntime tableRuntime = + new TestTableRuntime(baseRuntime.store(), configWithAdaptiveEnabled); + + // Test 1: getNextExecutingTime with adaptive interval enabled and positive + // latestRefreshInterval + // Set a positive latestRefreshInterval to enable adaptive behavior + tableRuntime.setLatestRefreshInterval(MAX_INTERVAL); + long nextExecutingTime = executor.getNextExecutingTime(tableRuntime); Assert.assertEquals(MAX_INTERVAL, nextExecutingTime); - // Test getNextExecutingTime with adaptive interval disabled - newTableRuntime = - buildTableRuntimeWithAdaptiveMaxInterval( - newTableRuntime, MAX_INTERVAL, DEFAULT_MAX_INTERVAL); - // Should use default interval rather than adaptive interval - nextExecutingTime = executor.getNextExecutingTime(newTableRuntime); - Assert.assertEquals(INTERVAL, nextExecutingTime); - - // Test getNextExecutingTime with zero adaptive interval - newTableRuntime = buildTableRuntimeWithAdaptiveMaxInterval(newTableRuntime, 0, MAX_INTERVAL); - // Should fallback to default interval when adaptive interval is 0 - nextExecutingTime = executor.getNextExecutingTime(newTableRuntime); - Assert.assertEquals(INTERVAL, nextExecutingTime); + // Test 2: getNextExecutingTime with adaptive interval enabled but latestRefreshInterval is 0 + // Should fall back to min(minorLeastInterval * 4/5, INTERVAL) + tableRuntime.setLatestRefreshInterval(0); + nextExecutingTime = executor.getNextExecutingTime(tableRuntime); + long expectedFallbackInterval = Math.min(MINOR_LEAST_INTERVAL * 4L / 5, INTERVAL); + Assert.assertEquals(expectedFallbackInterval, nextExecutingTime); + + // Test 3: getNextExecutingTime with adaptive interval disabled (maxInterval <= INTERVAL) + tableRuntime.updateOptimizingConfig(createOptimizingConfig(INTERVAL, STEP)); + nextExecutingTime = executor.getNextExecutingTime(tableRuntime); + Assert.assertEquals(expectedFallbackInterval, nextExecutingTime); dropTable(); dropDatabase(); diff --git a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java index 715f749b3c..eff8a31ce3 100644 --- a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java +++ b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java @@ -324,6 +324,14 @@ public OptimizingConfig setEvaluationMseTolerance(long evaluationMseTolerance) { return this; } + public boolean isRefreshTableAdaptiveEnabled(long minInterval) { + if (refreshTableAdaptiveMaxIntervalMs <= minInterval) { + return false; + } + + return true; + } + public long getRefreshTableAdaptiveMaxIntervalMs() { return refreshTableAdaptiveMaxIntervalMs; } diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java index 97f7e4a13e..015c68d480 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java @@ -163,7 +163,7 @@ private TableProperties() {} public static final String SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS = "self-optimizing.refresh-table.adaptive.max-interval-ms"; public static final long SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS_DEFAULT = - -1; // disabled + 0; // disabled public static final String SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS = "self-optimizing.refresh-table.adaptive.increase-step-ms"; public static final long SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS_DEFAULT = diff --git a/docs/user-guides/configurations.md b/docs/user-guides/configurations.md index 9e92faec0c..5c8fd3cc34 100644 --- a/docs/user-guides/configurations.md +++ b/docs/user-guides/configurations.md @@ -43,27 +43,27 @@ modified through [Alter Table](../using-tables/#modify-table) operations. Self-optimizing configurations are applicable to both Iceberg Format and Mixed streaming Format. -| Key | Default | Description | -|---------------------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| self-optimizing.enabled | true | Enables Self-optimizing | -| self-optimizing.allow-partial-commit | false | Whether to allow partial commit when self-optimizing fails or process is cancelled | -| self-optimizing.group | default | Optimizer group for Self-optimizing | -| self-optimizing.quota | 0.5 | Quota for Self-optimizing, indicating the optimizer resources the table can take up | -| self-optimizing.execute.num-retries | 5 | Number of retries after failure of Self-optimizing | -| self-optimizing.target-size | 134217728(128MB) | Target size for Self-optimizing | -| self-optimizing.max-file-count | 10000 | Maximum number of files processed by a Self-optimizing process | -| self-optimizing.max-task-size-bytes | 134217728(128MB) | Maximum file size bytes in a single task for splitting tasks | -| self-optimizing.fragment-ratio | 8 | The fragment file size threshold. We could divide self-optimizing.target-size by this ratio to get the actual fragment file size | -| self-optimizing.min-target-size-ratio | 0.75 | The undersized segment file size threshold. Segment files under this threshold will be considered for rewriting | -| self-optimizing.minor.trigger.file-count | 12 | The minimum number of files to trigger minor optimizing is determined by the sum of fragment file count and equality delete file count | -| self-optimizing.minor.trigger.interval | 3600000(1 hour) | The time interval in milliseconds to trigger minor optimizing | -| self-optimizing.major.trigger.duplicate-ratio | 0.1 | The ratio of duplicate data of segment files to trigger major optimizing | -| self-optimizing.full.trigger.interval | -1(closed) | The time interval in milliseconds to trigger full optimizing | -| self-optimizing.full.rewrite-all-files | true | Whether full optimizing rewrites all files or skips files that do not need to be optimized | -| self-optimizing.min-plan-interval | 60000 | The minimum time interval between two self-optimizing planning action | +| Key | Default | Description | +|---------------------------------------------------------|------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| self-optimizing.enabled | true | Enables Self-optimizing | +| self-optimizing.allow-partial-commit | false | Whether to allow partial commit when self-optimizing fails or process is cancelled | +| self-optimizing.group | default | Optimizer group for Self-optimizing | +| self-optimizing.quota | 0.5 | Quota for Self-optimizing, indicating the optimizer resources the table can take up | +| self-optimizing.execute.num-retries | 5 | Number of retries after failure of Self-optimizing | +| self-optimizing.target-size | 134217728(128MB) | Target size for Self-optimizing | +| self-optimizing.max-file-count | 10000 | Maximum number of files processed by a Self-optimizing process | +| self-optimizing.max-task-size-bytes | 134217728(128MB) | Maximum file size bytes in a single task for splitting tasks | +| self-optimizing.fragment-ratio | 8 | The fragment file size threshold. We could divide self-optimizing.target-size by this ratio to get the actual fragment file size | +| self-optimizing.min-target-size-ratio | 0.75 | The undersized segment file size threshold. Segment files under this threshold will be considered for rewriting | +| self-optimizing.minor.trigger.file-count | 12 | The minimum number of files to trigger minor optimizing is determined by the sum of fragment file count and equality delete file count | +| self-optimizing.minor.trigger.interval | 3600000(1 hour) | The time interval in milliseconds to trigger minor optimizing | +| self-optimizing.major.trigger.duplicate-ratio | 0.1 | The ratio of duplicate data of segment files to trigger major optimizing | +| self-optimizing.full.trigger.interval | -1(closed) | The time interval in milliseconds to trigger full optimizing | +| self-optimizing.full.rewrite-all-files | true | Whether full optimizing rewrites all files or skips files that do not need to be optimized | +| self-optimizing.min-plan-interval | 60000 | The minimum time interval between two self-optimizing planning action | | self-optimizing.filter | NULL | Filter conditions for self-optimizing, using SQL conditional expressions, without supporting any functions. For the timestamp column condition, the ISO date-time formatter must be used. For example: op_time > '2007-12-03T10:15:30'. | -| self-optimizing.refresh-table.adaptive.max-interval-ms | -1 | The maximum time interval in milliseconds to refresh table metadata. The default value is -1, which disables the adaptive refresh. | -| self-optimizing.refresh-table.adaptive.increase-step-ms | 30000(30s) | The time interval increase step in milliseconds to refresh table metadata | +| self-optimizing.refresh-table.adaptive.max-interval-ms | 0 | The maximum time interval in milliseconds to refresh table metadata. 0 means disable adaptive refresh. When enabled, the value should greater than 'refresh-tables.interval', and may exceed 'self-optimizing.minor.trigger.interval' * 4/5 | +| self-optimizing.refresh-table.adaptive.increase-step-ms | 30000(30s) | The time interval increase step in milliseconds to refresh table metadata | ## Data-cleaning configurations From 37d8369eec597b9bd7b24abfc20d3c67897ded6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=96=87=E9=A2=86?= Date: Thu, 5 Feb 2026 14:37:01 +0800 Subject: [PATCH 4/5] The validation parameter maxInterval must be greater than minInterval. --- .../inline/TableRuntimeRefreshExecutor.java | 17 +++++++++++-- .../TestTableRuntimeRefreshExecutor.java | 25 +++++++++++++++---- .../apache/amoro/config/OptimizingConfig.java | 8 ++---- 3 files changed, 37 insertions(+), 13 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java index b6b7e4ba63..071c9e2f77 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java @@ -18,6 +18,8 @@ package org.apache.amoro.server.scheduler.inline; +import static org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS; + import org.apache.amoro.AmoroTable; import org.apache.amoro.TableRuntime; import org.apache.amoro.config.OptimizingConfig; @@ -25,6 +27,7 @@ import org.apache.amoro.optimizing.evaluation.MetadataBasedEvaluationEvent; import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator; import org.apache.amoro.process.ProcessStatus; +import org.apache.amoro.server.AmoroManagementConf; import org.apache.amoro.server.optimizing.OptimizingProcess; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.scheduler.PeriodicTableScheduler; @@ -58,7 +61,7 @@ protected boolean enabled(TableRuntime tableRuntime) { protected long getNextExecutingTime(TableRuntime tableRuntime) { DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime; - if (defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled(interval)) { + if (defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled()) { long newInterval = defaultTableRuntime.getLatestRefreshInterval(); if (newInterval > 0) { return newInterval; @@ -161,7 +164,7 @@ public void execute(TableRuntime tableRuntime) { } // Update adaptive interval according to evaluated result. - if (defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled(interval)) { + if (defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled()) { defaultTableRuntime.setLatestEvaluatedNeedOptimizing(hasOptimizingDemand); long newInterval = getAdaptiveExecutingInterval(defaultTableRuntime); defaultTableRuntime.setLatestRefreshInterval(newInterval); @@ -195,6 +198,16 @@ public long getAdaptiveExecutingInterval(DefaultTableRuntime tableRuntime) { final long minInterval = interval; final long maxInterval = tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveMaxIntervalMs(); + + Preconditions.checkArgument( + minInterval < maxInterval, + String.format( + "The adaptive refresh configuration %s(%d ms) must be greater than %s(%d ms).", + SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS, + maxInterval, + AmoroManagementConf.REFRESH_TABLES_INTERVAL.key(), + minInterval)); + long currentInterval = tableRuntime.getLatestRefreshInterval(); // Initialize interval on first run or after restart diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java index 0c0c09e2b3..6bb53e0e7a 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java @@ -18,6 +18,8 @@ package org.apache.amoro.server.scheduler.inline; +import static org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS; + import org.apache.amoro.BasicTableTestHelper; import org.apache.amoro.TableFormat; import org.apache.amoro.TableTestHelper; @@ -26,6 +28,7 @@ import org.apache.amoro.config.OptimizingConfig; import org.apache.amoro.hive.catalog.HiveCatalogTestHelper; import org.apache.amoro.hive.catalog.HiveTableTestHelper; +import org.apache.amoro.server.AmoroManagementConf; import org.apache.amoro.server.table.AMSTableTestBase; import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.table.TableRuntimeStore; @@ -166,6 +169,23 @@ public void testAdaptiveRefreshIntervalScenarios() { adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(tableRuntime); Assert.assertEquals(MAX_INTERVAL, adaptiveExecutingInterval); + // Test5: MaxInterval should be greater than minInterval + long maxInterval = INTERVAL - 1000; + tableRuntime.updateOptimizingConfig(createOptimizingConfig(maxInterval, STEP)); + try { + executor.getAdaptiveExecutingInterval(tableRuntime); + } catch (IllegalArgumentException e) { + + Assert.assertEquals( + e.getMessage(), + String.format( + "The adaptive refresh configuration %s(%d ms) must be greater than %s(%d ms).", + SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS, + maxInterval, + AmoroManagementConf.REFRESH_TABLES_INTERVAL.key(), + INTERVAL)); + } + dropTable(); dropDatabase(); } @@ -200,11 +220,6 @@ public void testGetNextExecutingTime() { long expectedFallbackInterval = Math.min(MINOR_LEAST_INTERVAL * 4L / 5, INTERVAL); Assert.assertEquals(expectedFallbackInterval, nextExecutingTime); - // Test 3: getNextExecutingTime with adaptive interval disabled (maxInterval <= INTERVAL) - tableRuntime.updateOptimizingConfig(createOptimizingConfig(INTERVAL, STEP)); - nextExecutingTime = executor.getNextExecutingTime(tableRuntime); - Assert.assertEquals(expectedFallbackInterval, nextExecutingTime); - dropTable(); dropDatabase(); } diff --git a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java index eff8a31ce3..8fd2ab6a56 100644 --- a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java +++ b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java @@ -324,12 +324,8 @@ public OptimizingConfig setEvaluationMseTolerance(long evaluationMseTolerance) { return this; } - public boolean isRefreshTableAdaptiveEnabled(long minInterval) { - if (refreshTableAdaptiveMaxIntervalMs <= minInterval) { - return false; - } - - return true; + public boolean isRefreshTableAdaptiveEnabled() { + return refreshTableAdaptiveMaxIntervalMs > 0; } public long getRefreshTableAdaptiveMaxIntervalMs() { From 5d564df03eb4ce64f60362f5937db2c453324053 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=96=87=E9=A2=86?= Date: Fri, 6 Feb 2026 15:57:25 +0800 Subject: [PATCH 5/5] fixup the invalid configuration self-optimizing.refresh-table.adaptive.max-interval-ms --- .../inline/TableRuntimeRefreshExecutor.java | 24 +++++++---- .../TestTableRuntimeRefreshExecutor.java | 25 ++++------- docs/user-guides/configurations.md | 42 +++++++++---------- 3 files changed, 46 insertions(+), 45 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java index 071c9e2f77..9f389138e4 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java @@ -28,6 +28,7 @@ import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator; import org.apache.amoro.process.ProcessStatus; import org.apache.amoro.server.AmoroManagementConf; +import org.apache.amoro.server.AmoroServiceConstants; import org.apache.amoro.server.optimizing.OptimizingProcess; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.scheduler.PeriodicTableScheduler; @@ -199,14 +200,21 @@ public long getAdaptiveExecutingInterval(DefaultTableRuntime tableRuntime) { final long maxInterval = tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveMaxIntervalMs(); - Preconditions.checkArgument( - minInterval < maxInterval, - String.format( - "The adaptive refresh configuration %s(%d ms) must be greater than %s(%d ms).", - SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS, - maxInterval, - AmoroManagementConf.REFRESH_TABLES_INTERVAL.key(), - minInterval)); + if (maxInterval <= minInterval) { + tableRuntime + .getOptimizingConfig() + .setRefreshTableAdaptiveMaxIntervalMs(AmoroServiceConstants.INVALID_TIME); + logger.warn( + "Invalid adaptive refresh configuration for table {}: {} = {}ms is not greater than {} = {}ms. Setting {} to default value 0 to disable dynamic refresh logic.", + tableRuntime.getTableIdentifier(), + SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS, + maxInterval, + AmoroManagementConf.REFRESH_TABLES_INTERVAL.key(), + minInterval, + SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS); + + return AmoroServiceConstants.INVALID_TIME; + } long currentInterval = tableRuntime.getLatestRefreshInterval(); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java index 6bb53e0e7a..5d5a0fb6d3 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java @@ -18,8 +18,6 @@ package org.apache.amoro.server.scheduler.inline; -import static org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS; - import org.apache.amoro.BasicTableTestHelper; import org.apache.amoro.TableFormat; import org.apache.amoro.TableTestHelper; @@ -28,7 +26,7 @@ import org.apache.amoro.config.OptimizingConfig; import org.apache.amoro.hive.catalog.HiveCatalogTestHelper; import org.apache.amoro.hive.catalog.HiveTableTestHelper; -import org.apache.amoro.server.AmoroManagementConf; +import org.apache.amoro.server.AmoroServiceConstants; import org.apache.amoro.server.table.AMSTableTestBase; import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.table.TableRuntimeStore; @@ -170,21 +168,16 @@ public void testAdaptiveRefreshIntervalScenarios() { Assert.assertEquals(MAX_INTERVAL, adaptiveExecutingInterval); // Test5: MaxInterval should be greater than minInterval + // If maxInterval <= minInterval, the latest interval and the maxInterval will be reset to + // default value 0 long maxInterval = INTERVAL - 1000; tableRuntime.updateOptimizingConfig(createOptimizingConfig(maxInterval, STEP)); - try { - executor.getAdaptiveExecutingInterval(tableRuntime); - } catch (IllegalArgumentException e) { - - Assert.assertEquals( - e.getMessage(), - String.format( - "The adaptive refresh configuration %s(%d ms) must be greater than %s(%d ms).", - SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS, - maxInterval, - AmoroManagementConf.REFRESH_TABLES_INTERVAL.key(), - INTERVAL)); - } + tableRuntime.setLatestRefreshInterval(adaptiveExecutingInterval); + adaptiveExecutingInterval = executor.getAdaptiveExecutingInterval(tableRuntime); + Assert.assertEquals(AmoroServiceConstants.INVALID_TIME, adaptiveExecutingInterval); + Assert.assertEquals( + AmoroServiceConstants.INVALID_TIME, + tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveMaxIntervalMs()); dropTable(); dropDatabase(); diff --git a/docs/user-guides/configurations.md b/docs/user-guides/configurations.md index 5c8fd3cc34..253c7a6e0b 100644 --- a/docs/user-guides/configurations.md +++ b/docs/user-guides/configurations.md @@ -43,27 +43,27 @@ modified through [Alter Table](../using-tables/#modify-table) operations. Self-optimizing configurations are applicable to both Iceberg Format and Mixed streaming Format. -| Key | Default | Description | -|---------------------------------------------------------|------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| self-optimizing.enabled | true | Enables Self-optimizing | -| self-optimizing.allow-partial-commit | false | Whether to allow partial commit when self-optimizing fails or process is cancelled | -| self-optimizing.group | default | Optimizer group for Self-optimizing | -| self-optimizing.quota | 0.5 | Quota for Self-optimizing, indicating the optimizer resources the table can take up | -| self-optimizing.execute.num-retries | 5 | Number of retries after failure of Self-optimizing | -| self-optimizing.target-size | 134217728(128MB) | Target size for Self-optimizing | -| self-optimizing.max-file-count | 10000 | Maximum number of files processed by a Self-optimizing process | -| self-optimizing.max-task-size-bytes | 134217728(128MB) | Maximum file size bytes in a single task for splitting tasks | -| self-optimizing.fragment-ratio | 8 | The fragment file size threshold. We could divide self-optimizing.target-size by this ratio to get the actual fragment file size | -| self-optimizing.min-target-size-ratio | 0.75 | The undersized segment file size threshold. Segment files under this threshold will be considered for rewriting | -| self-optimizing.minor.trigger.file-count | 12 | The minimum number of files to trigger minor optimizing is determined by the sum of fragment file count and equality delete file count | -| self-optimizing.minor.trigger.interval | 3600000(1 hour) | The time interval in milliseconds to trigger minor optimizing | -| self-optimizing.major.trigger.duplicate-ratio | 0.1 | The ratio of duplicate data of segment files to trigger major optimizing | -| self-optimizing.full.trigger.interval | -1(closed) | The time interval in milliseconds to trigger full optimizing | -| self-optimizing.full.rewrite-all-files | true | Whether full optimizing rewrites all files or skips files that do not need to be optimized | -| self-optimizing.min-plan-interval | 60000 | The minimum time interval between two self-optimizing planning action | -| self-optimizing.filter | NULL | Filter conditions for self-optimizing, using SQL conditional expressions, without supporting any functions. For the timestamp column condition, the ISO date-time formatter must be used. For example: op_time > '2007-12-03T10:15:30'. | -| self-optimizing.refresh-table.adaptive.max-interval-ms | 0 | The maximum time interval in milliseconds to refresh table metadata. 0 means disable adaptive refresh. When enabled, the value should greater than 'refresh-tables.interval', and may exceed 'self-optimizing.minor.trigger.interval' * 4/5 | -| self-optimizing.refresh-table.adaptive.increase-step-ms | 30000(30s) | The time interval increase step in milliseconds to refresh table metadata | +| Key | Default | Description | +|---------------------------------------------------------|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| self-optimizing.enabled | true | Enables Self-optimizing | +| self-optimizing.allow-partial-commit | false | Whether to allow partial commit when self-optimizing fails or process is cancelled | +| self-optimizing.group | default | Optimizer group for Self-optimizing | +| self-optimizing.quota | 0.5 | Quota for Self-optimizing, indicating the optimizer resources the table can take up | +| self-optimizing.execute.num-retries | 5 | Number of retries after failure of Self-optimizing | +| self-optimizing.target-size | 134217728(128MB) | Target size for Self-optimizing | +| self-optimizing.max-file-count | 10000 | Maximum number of files processed by a Self-optimizing process | +| self-optimizing.max-task-size-bytes | 134217728(128MB) | Maximum file size bytes in a single task for splitting tasks | +| self-optimizing.fragment-ratio | 8 | The fragment file size threshold. We could divide self-optimizing.target-size by this ratio to get the actual fragment file size | +| self-optimizing.min-target-size-ratio | 0.75 | The undersized segment file size threshold. Segment files under this threshold will be considered for rewriting | +| self-optimizing.minor.trigger.file-count | 12 | The minimum number of files to trigger minor optimizing is determined by the sum of fragment file count and equality delete file count | +| self-optimizing.minor.trigger.interval | 3600000(1 hour) | The time interval in milliseconds to trigger minor optimizing | +| self-optimizing.major.trigger.duplicate-ratio | 0.1 | The ratio of duplicate data of segment files to trigger major optimizing | +| self-optimizing.full.trigger.interval | -1(closed) | The time interval in milliseconds to trigger full optimizing | +| self-optimizing.full.rewrite-all-files | true | Whether full optimizing rewrites all files or skips files that do not need to be optimized | +| self-optimizing.min-plan-interval | 60000 | The minimum time interval between two self-optimizing planning action | +| self-optimizing.filter | NULL | Filter conditions for self-optimizing, using SQL conditional expressions, without supporting any functions. For the timestamp column condition, the ISO date-time formatter must be used. For example: op_time > '2007-12-03T10:15:30'. | +| self-optimizing.refresh-table.adaptive.max-interval-ms | 0 | The maximum time interval in milliseconds to refresh table metadata. 0 means disable adaptive refresh. When enabled, the value must be greater than 'refresh-tables.interval' and may exceed 'self-optimizing.minor.trigger.interval' * 4/5. Invalid value will be reset to 0. | +| self-optimizing.refresh-table.adaptive.increase-step-ms | 30000(30s) | The time interval increase step in milliseconds to refresh table metadata | ## Data-cleaning configurations