diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java index e0a26fed55..7877ee76af 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java @@ -487,9 +487,9 @@ public class AmoroManagementConf { public static final ConfigOption OPTIMIZER_TASK_EXECUTE_TIMEOUT = ConfigOptions.key("optimizer.task-execute-timeout") .durationType() - .defaultValue(Duration.ofHours(1)) - .withDescription("Timeout duration for task execution, default to 1 hour."); - + .defaultValue(Duration.ofSeconds(Integer.MAX_VALUE)) + .withDescription( + "Timeout duration for task execution, default to Integer.MAX_VALUE seconds(about 24,855 days)."); public static final ConfigOption OPTIMIZER_MAX_PLANNING_PARALLELISM = ConfigOptions.key("optimizer.max-planning-parallelism") .intType() diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index ebb4bf1309..4fd3e5e9af 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -28,6 +28,7 @@ import org.apache.amoro.api.AmoroTableMetastore; import org.apache.amoro.api.OptimizingService; import org.apache.amoro.config.ConfigHelpers; +import org.apache.amoro.config.ConfigurationException; import org.apache.amoro.config.Configurations; import org.apache.amoro.config.shade.utils.ConfigShadeUtils; import org.apache.amoro.exception.AmoroRuntimeException; @@ -149,6 +150,9 @@ public static void main(String[] args) { service.transitionToLeader(); // Used to block AMS instances that have acquired leadership service.waitFollowerShip(); + } catch (ConfigurationException e) { + LOG.error("AMS will exit...", e); + System.exit(1); } catch (Exception e) { LOG.error("AMS start error", e); } finally { @@ -157,7 +161,7 @@ public static void main(String[] args) { } } } catch (Throwable t) { - LOG.error("AMS encountered an unknown exception, will exist", t); + LOG.error("AMS encountered an unknown exception, will exit...", t); System.exit(1); } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java index 6cc05bc56b..f3b13e69a6 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java @@ -113,17 +113,17 @@ public DefaultOptimizingService( OptimizerManager optimizerManager, TableService tableService) { this.optimizerTouchTimeout = - serviceConfig.get(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT).toMillis(); + serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT); this.taskAckTimeout = - serviceConfig.get(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT).toMillis(); + serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT); this.taskExecuteTimeout = - serviceConfig.get(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT).toMillis(); + serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT); this.refreshGroupInterval = - serviceConfig.get(AmoroManagementConf.OPTIMIZING_REFRESH_GROUP_INTERVAL).toMillis(); + serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZING_REFRESH_GROUP_INTERVAL); this.maxPlanningParallelism = serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_MAX_PLANNING_PARALLELISM); this.pollingTimeout = - serviceConfig.get(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT).toMillis(); + serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT); this.breakQuotaLimit = serviceConfig.getBoolean(AmoroManagementConf.OPTIMIZING_BREAK_QUOTA_LIMIT_ENABLED); this.tableService = tableService; @@ -511,8 +511,7 @@ public void run() { } private void retryTask(TaskRuntime task, OptimizingQueue queue) { - if (task.getStatus() == TaskRuntime.Status.ACKED - && task.getStartTime() + taskExecuteTimeout < System.currentTimeMillis()) { + if (isTaskExecTimeout(task)) { LOG.warn( "Task {} has been suspended in ACK state for {} (start time: {}), put it to retry queue, optimizer {}. (Note: The task may have finished executing, but ams did not receive the COMPLETE message from the optimizer.)", task.getTaskId(), @@ -543,11 +542,16 @@ private Predicate> buildSuspendingPredication(Set activeT && task.getStatus() != TaskRuntime.Status.SUCCESS || task.getStatus() == TaskRuntime.Status.SCHEDULED && task.getStartTime() + taskAckTimeout < System.currentTimeMillis() - || task.getStatus() == TaskRuntime.Status.ACKED - && task.getStartTime() + taskExecuteTimeout < System.currentTimeMillis(); + || isTaskExecTimeout(task); } } + private boolean isTaskExecTimeout(TaskRuntime task) { + return task.getStatus() == TaskRuntime.Status.ACKED + && taskExecuteTimeout > 0 + && task.getStartTime() + taskExecuteTimeout < System.currentTimeMillis(); + } + private class OptimizingConfigWatcher implements Runnable { private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor( diff --git a/amoro-ams/src/test/java/org/apache/amoro/config/ConfigurationsTest.java b/amoro-ams/src/test/java/org/apache/amoro/config/ConfigurationsTest.java index 4019aa4909..1e9a9df41a 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/config/ConfigurationsTest.java +++ b/amoro-ams/src/test/java/org/apache/amoro/config/ConfigurationsTest.java @@ -35,6 +35,7 @@ import java.util.Comparator; import java.util.List; import java.util.Optional; +import java.util.Properties; /** * End-to-end test cases for configuration documentation. @@ -74,6 +75,35 @@ public void testAmoroManagementConfDocumentation() throws Exception { generateConfigurationMarkdown("ams-config.md", "AMS Configuration", 100, confInfoList); } + @Test + public void testGetDurationInMillis() throws Exception { + Properties properties = new Properties(); + properties.put(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT.key(), "1h"); + Configurations configuration = ConfigHelpers.createConfiguration(properties); + long durationInMillis = + configuration.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT); + Assertions.assertEquals(3600000, durationInMillis); + + // default value test + properties = new Properties(); + configuration = ConfigHelpers.createConfiguration(properties); + durationInMillis = + configuration.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT); + Assertions.assertEquals(Integer.MAX_VALUE * 1000L, durationInMillis); + + properties.put(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT.key(), Long.MAX_VALUE + "m"); + final Configurations conf1 = ConfigHelpers.createConfiguration(properties); + Assertions.assertThrows( + ConfigurationException.class, + () -> conf1.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT)); + + properties.put(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT.key(), "-1m"); + final Configurations conf2 = ConfigHelpers.createConfiguration(properties); + Assertions.assertThrows( + ConfigurationException.class, + () -> conf2.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT)); + } + /** * Generate configuration documentation for multiple configuration classes. * diff --git a/amoro-common/src/main/java/org/apache/amoro/config/ConfigurationException.java b/amoro-common/src/main/java/org/apache/amoro/config/ConfigurationException.java new file mode 100644 index 0000000000..9294b8e955 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/config/ConfigurationException.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.config; + +/** + * Exception thrown when a configuration value causes an exception, typically when converting + * Duration to milliseconds. This exception can be caught to trigger process exit. + */ +public class ConfigurationException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + private final String configKey; + + public ConfigurationException(String configKey, String message) { + super(message); + this.configKey = configKey; + } + + public ConfigurationException(String configKey, String message, Throwable cause) { + super(message, cause); + this.configKey = configKey; + } + + /** + * Returns the configuration key that caused the overflow. + * + * @return the configuration key + */ + public String getConfigKey() { + return configKey; + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java b/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java index 1b1ace835b..74ecf61a12 100644 --- a/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java +++ b/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java @@ -20,6 +20,7 @@ import static org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkNotNull; +import java.time.Duration; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -532,6 +533,21 @@ public T get(ConfigOption option) { return getOptional(option).orElseGet(option::defaultValue); } + public long getDurationInMillis(ConfigOption option) { + long result; + try { + result = getOptional(option).orElseGet(option::defaultValue).toMillis(); + } catch (Exception e) { // may be throw java.lang.ArithmeticException: long overflow + throw new ConfigurationException( + option.key(), + String.format( + "Exception when converting duration to millis for config option '%s': %s", + option.key(), e.getMessage()), + e); + } + return result; + } + public Optional getOptional(ConfigOption option) { Optional rawValue = getRawValueFromOption(option); Class clazz = option.getClazz(); diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md index 147122e140..d976978c2e 100644 --- a/docs/configuration/ams-config.md +++ b/docs/configuration/ams-config.md @@ -92,7 +92,7 @@ table td:last-child, table th:last-child { width: 40%; word-break: break-all; } | optimizer.max-planning-parallelism | 1 | Max planning parallelism in one optimizer group. | | optimizer.polling-timeout | 3 s | Optimizer polling task timeout. | | optimizer.task-ack-timeout | 30 s | Timeout duration for task acknowledgment. | -| optimizer.task-execute-timeout | 1 h | Timeout duration for task execution, default to 1 hour. | +| optimizer.task-execute-timeout | 2147483647 s | Timeout duration for task execution, default to Integer.MAX_VALUE seconds(about 24,855 days). | | overview-cache.max-size | 3360 | Max size of overview cache. | | overview-cache.refresh-interval | 3 min | Interval for refreshing overview cache. | | refresh-external-catalogs.interval | 3 min | Interval to refresh the external catalog. |