diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java index 7877ee76af..3b60fde48f 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java @@ -472,6 +472,41 @@ public class AmoroManagementConf { .defaultValue(30000L) .withDescription("Max wait time before getting a connection timeout."); + public static final ConfigOption DYNAMIC_CONFIG_ENABLED = + ConfigOptions.key("dynamic-config.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to enable dynamic configuration backed by database table `dynamic_conf`."); + + public static final ConfigOption DYNAMIC_CONFIG_REFRESH_INTERVAL = + ConfigOptions.key("dynamic-config.refresh-interval") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "Refresh interval for reloading dynamic configuration overrides from database."); + + public static final ConfigOption DYNAMIC_CONFIG_NAMESPACE = + ConfigOptions.key("dynamic-config.namespace") + .stringType() + .defaultValue("AMS") + .withDescription( + "Logical namespace used when loading dynamic configuration overrides for AMS."); + + public static final ConfigOption PLUGIN_CATEGORY_PROPERTY_KEY = + ConfigOptions.key("plugin.property.category-key") + .stringType() + .defaultValue("plugin.category") + .withDescription( + "The property key used to store plugin category identifier in plugin properties."); + + public static final ConfigOption PLUGIN_NAME_PROPERTY_KEY = + ConfigOptions.key("plugin.property.name-key") + .stringType() + .defaultValue("plugin.name") + .withDescription( + "The property key used to store plugin name identifier in plugin properties."); + public static final ConfigOption OPTIMIZER_HB_TIMEOUT = ConfigOptions.key("optimizer.heart-beat-timeout") .durationType() diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 4fd3e5e9af..fbf70f49d2 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -29,11 +29,15 @@ import org.apache.amoro.api.OptimizingService; import org.apache.amoro.config.ConfigHelpers; import org.apache.amoro.config.ConfigurationException; +import org.apache.amoro.config.ConfigurationManager; import org.apache.amoro.config.Configurations; +import org.apache.amoro.config.DynamicConfigurations; import org.apache.amoro.config.shade.utils.ConfigShadeUtils; import org.apache.amoro.exception.AmoroRuntimeException; import org.apache.amoro.server.catalog.CatalogManager; import org.apache.amoro.server.catalog.DefaultCatalogManager; +import org.apache.amoro.server.config.DbConfigurationManager; +import org.apache.amoro.server.config.DynamicConfigStore; import org.apache.amoro.server.dashboard.DashboardServer; import org.apache.amoro.server.dashboard.JavalinJsonMapper; import org.apache.amoro.server.dashboard.response.ErrorResponse; @@ -114,6 +118,8 @@ public class AmoroServiceContainer { private ProcessService processService; private TerminalManager terminalManager; private Configurations serviceConfig; + private ConfigurationManager configurationManager; + private DynamicConfigurations dynamicConfigurations; private TServer tableManagementServer; private TServer optimizingServiceServer; private Javalin httpServer; @@ -199,13 +205,17 @@ public void waitFollowerShip() throws Exception { } public void startRestServices() throws Exception { - EventsManager.getInstance(); - MetricManager.getInstance(); + EventsManager.getInstance(serviceConfig, configurationManager); + MetricManager.getInstance(serviceConfig, configurationManager); catalogManager = new DefaultCatalogManager(serviceConfig); tableManager = new DefaultTableManager(serviceConfig, catalogManager); optimizerManager = new DefaultOptimizerManager(serviceConfig, catalogManager); - terminalManager = new TerminalManager(serviceConfig, catalogManager); + if (dynamicConfigurations != null) { + terminalManager = new TerminalManager(dynamicConfigurations, catalogManager); + } else { + terminalManager = new TerminalManager(serviceConfig, catalogManager); + } initHttpService(); startHttpService(); @@ -229,16 +239,26 @@ public void transitionToFollower() { } public void startOptimizingService() throws Exception { - TableRuntimeFactoryManager tableRuntimeFactoryManager = new TableRuntimeFactoryManager(); + TableRuntimeFactoryManager tableRuntimeFactoryManager = + new TableRuntimeFactoryManager(serviceConfig, configurationManager); tableRuntimeFactoryManager.initialize(); - tableService = - new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager); - - optimizingService = - new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService); - - processService = new ProcessService(serviceConfig, tableService); + if (dynamicConfigurations != null) { + tableService = + new DefaultTableService( + dynamicConfigurations, catalogManager, tableRuntimeFactoryManager); + optimizingService = + new DefaultOptimizingService( + dynamicConfigurations, catalogManager, optimizerManager, tableService); + processService = new ProcessService(dynamicConfigurations, tableService); + } else { + tableService = + new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager); + optimizingService = + new DefaultOptimizingService( + serviceConfig, catalogManager, optimizerManager, tableService); + processService = new ProcessService(serviceConfig, tableService); + } LOG.info("Setting up AMS table executors..."); InlineTableExecutors.getInstance().setup(tableService, serviceConfig); @@ -319,6 +339,11 @@ public void disposeRestService() { public void dispose() { disposeOptimizingService(); disposeRestService(); + if (configurationManager != null) { + configurationManager.stop(); + configurationManager = null; + dynamicConfigurations = null; + } } private void initConfig() throws Exception { @@ -528,6 +553,7 @@ private class ConfigurationHelper { public void init() throws Exception { Map envConfig = initEnvConfig(); initServiceConfig(envConfig); + initDynamicConfiguration(); setIcebergSystemProperties(); initContainerConfig(); } @@ -557,6 +583,24 @@ private void initServiceConfig(Map envConfig) throws Exception { SqlSessionFactoryProvider.getInstance().init(dataSource); } + private void initDynamicConfiguration() { + boolean dynamicEnabled = serviceConfig.getBoolean(AmoroManagementConf.DYNAMIC_CONFIG_ENABLED); + if (!dynamicEnabled) { + LOG.info( + "Dynamic configuration is disabled by {}", + AmoroManagementConf.DYNAMIC_CONFIG_ENABLED.key()); + return; + } + java.time.Duration refreshInterval = + serviceConfig.get(AmoroManagementConf.DYNAMIC_CONFIG_REFRESH_INTERVAL); + configurationManager = new DbConfigurationManager(new DynamicConfigStore(), refreshInterval); + configurationManager.start(); + dynamicConfigurations = new DynamicConfigurations(serviceConfig, configurationManager); + LOG.info( + "Dynamic configuration enabled for AMS service with refresh interval {} ms", + refreshInterval.toMillis()); + } + private Map initEnvConfig() { LOG.info("initializing system env configuration..."); Map envs = System.getenv(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/config/DbConfigurationManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/config/DbConfigurationManager.java new file mode 100755 index 0000000000..a06b262e5d --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/config/DbConfigurationManager.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.config; + +import org.apache.amoro.config.ConfigurationManager; +import org.apache.amoro.config.Configurations; +import org.apache.amoro.config.DynamicConfigurations; +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; +import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Default {@link ConfigurationManager} implementation backed by {@link DynamicConfigStore}. + * + *

This manager is responsible for: + * + *

    + *
  • periodically reloading AMS service level overrides, + *
  • periodically reloading plugin level overrides for registered plugins, + *
  • notifying all registered {@link DynamicConfigurations} instances to refresh themselves. + *
+ * + *

Merge semantics are implemented in {@link DynamicConfigurations}: for service level + * configuration, {@code currentConfig = merge(baseConfig, getServerConfigurations())}; for plugin + * level configuration, {@code currentConfig = merge(baseConfig, + * getPluginConfigurations(pluginCategory, pluginName))}. + */ +public class DbConfigurationManager implements ConfigurationManager { + + private static final Logger LOG = LoggerFactory.getLogger(DbConfigurationManager.class); + + private final DynamicConfigStore store; + private final ScheduledExecutorService scheduler; + private final long refreshIntervalMillis; + + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean stopped = new AtomicBoolean(false); + + private final AtomicReference serverOverrides = + new AtomicReference<>(new Configurations()); + + private final ConcurrentMap> pluginOverrides = + new ConcurrentHashMap<>(); + + private final CopyOnWriteArrayList dynamicConfigs = + new CopyOnWriteArrayList<>(); + + public DbConfigurationManager(DynamicConfigStore store, Duration refreshInterval) { + Preconditions.checkNotNull(store, "store must not be null"); + Preconditions.checkNotNull(refreshInterval, "refreshInterval must not be null"); + this.store = store; + this.refreshIntervalMillis = Math.max(1L, refreshInterval.toMillis()); + this.scheduler = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("db-configuration-manager-%d") + .build()); + } + + @Override + public Configurations getServerConfigurations() { + Configurations overrides = serverOverrides.get(); + return overrides != null ? overrides : new Configurations(); + } + + @Override + public Configurations getPluginConfigurations(String pluginCategory, String pluginName) { + if (pluginCategory == null) { + return new Configurations(); + } + PluginKey key = new PluginKey(pluginCategory.trim(), pluginName); + AtomicReference ref = pluginOverrides.get(key); + return ref != null && ref.get() != null ? ref.get() : new Configurations(); + } + + @Override + public void start() { + if (!started.compareAndSet(false, true)) { + return; + } + LOG.info("Starting DbConfigurationManager with refresh interval {} ms", refreshIntervalMillis); + // initial load before scheduling periodic refresh + safeRefresh(); + scheduler.scheduleWithFixedDelay( + this::safeRefresh, refreshIntervalMillis, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + @Override + public void stop() { + if (!stopped.compareAndSet(false, true)) { + return; + } + LOG.info("Stopping DbConfigurationManager"); + scheduler.shutdownNow(); + dynamicConfigs.clear(); + pluginOverrides.clear(); + } + + @Override + public void registerDynamicConfig(DynamicConfigurations dyn) { + if (dyn == null) { + return; + } + dynamicConfigs.add(dyn); + } + + private void safeRefresh() { + if (stopped.get()) { + return; + } + try { + refreshInternal(); + } catch (Throwable t) { + LOG.warn("Failed to refresh dynamic configuration overrides", t); + } + } + + private void refreshInternal() { + // 1) reload AMS service level overrides + Map serverMap = store.loadServerOverrides(); + serverOverrides.set(Configurations.fromMap(serverMap)); + + // 2) reload plugin level overrides for all registered plugin dynamic configurations + Map latestPluginOverrides = new HashMap<>(); + for (DynamicConfigurations dyn : dynamicConfigs) { + if (!dyn.isPluginLevel()) { + continue; + } + PluginKey key = new PluginKey(dyn.getPluginCategory(), dyn.getPluginName()); + if (latestPluginOverrides.containsKey(key)) { + continue; + } + Map overridesMap = + store.loadPluginOverrides(key.pluginCategory, key.pluginName); + latestPluginOverrides.put(key, Configurations.fromMap(overridesMap)); + } + + for (Map.Entry entry : latestPluginOverrides.entrySet()) { + pluginOverrides + .computeIfAbsent(entry.getKey(), k -> new AtomicReference<>(new Configurations())) + .set(entry.getValue()); + } + + // 3) refresh all registered DynamicConfigurations instances + for (DynamicConfigurations dyn : dynamicConfigs) { + try { + dyn.refreshFromManager(); + } catch (Throwable t) { + LOG.warn( + "Failed to refresh DynamicConfigurations for pluginCategory={}, pluginName={}", + dyn.getPluginCategory(), + dyn.getPluginName(), + t); + } + } + } + + private static final class PluginKey { + private final String pluginCategory; + private final String pluginName; + + private PluginKey(String pluginCategory, String pluginName) { + this.pluginCategory = pluginCategory; + this.pluginName = pluginName == null ? null : pluginName.trim(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PluginKey pluginKey = (PluginKey) o; + return Objects.equals(pluginCategory, pluginKey.pluginCategory) + && Objects.equals(pluginName, pluginKey.pluginName); + } + + @Override + public int hashCode() { + return Objects.hash(pluginCategory, pluginName); + } + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/config/DynamicConfigStore.java b/amoro-ams/src/main/java/org/apache/amoro/server/config/DynamicConfigStore.java new file mode 100755 index 0000000000..a980639fcc --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/config/DynamicConfigStore.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.config; + +import org.apache.amoro.server.persistence.DynamicConfigEntry; +import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.server.persistence.mapper.DynamicConfigMapper; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Persistence helper for dynamic configuration. + * + *

This class is a thin wrapper around {@link DynamicConfigMapper} providing convenient methods + * to load configuration overrides for the AMS service and individual plugins. + */ +public class DynamicConfigStore extends PersistentBase { + + private static final String AMS_CONF_GROUP = "AMS"; + + /** Load all key/value overrides for AMS service level (conf_group = 'AMS'). */ + public Map loadServerOverrides() { + List entries = + getAs(DynamicConfigMapper.class, DynamicConfigMapper::selectServerConfigs); + Map overrides = new HashMap<>(); + for (DynamicConfigEntry entry : entries) { + if (entry.getConfKey() != null && entry.getConfValue() != null) { + overrides.put(entry.getConfKey(), entry.getConfValue()); + } + } + return overrides; + } + + /** + * Load all key/value overrides for a specific plugin. + * + * @param category plugin category, e.g. {@code "metric-reporters"} or {@code "event-listeners"} + * @param pluginName plugin configuration name + */ + public Map loadPluginOverrides(String category, String pluginName) { + String confGroup = buildPluginConfGroup(category); + List entries = + getAs(DynamicConfigMapper.class, m -> m.selectPluginConfigs(confGroup, pluginName)); + Map overrides = new HashMap<>(); + for (DynamicConfigEntry entry : entries) { + if (entry.getConfKey() != null && entry.getConfValue() != null) { + overrides.put(entry.getConfKey(), entry.getConfValue()); + } + } + return overrides; + } + + private String buildPluginConfGroup(String category) { + if (category == null || category.isEmpty()) { + return "PLUGIN"; + } + return "PLUGIN_" + category; + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractPluginManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractPluginManager.java index e3c70c41f3..a12973396a 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractPluginManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractPluginManager.java @@ -19,8 +19,11 @@ package org.apache.amoro.server.manager; import org.apache.amoro.ActivePlugin; +import org.apache.amoro.config.ConfigurationManager; +import org.apache.amoro.config.Configurations; import org.apache.amoro.exception.AlreadyExistsException; import org.apache.amoro.exception.LoadingPluginException; +import org.apache.amoro.server.AmoroManagementConf; import org.apache.amoro.server.Environments; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; @@ -69,11 +72,28 @@ public abstract class AbstractPluginManager implements P private final Map foundedPlugins = new ConcurrentHashMap<>(); private final Map pluginConfigs = Maps.newLinkedHashMap(); private final String pluginCategory; + protected final Configurations serviceConfig; private final Class pluginType; + private final ConfigurationManager configurationManager; @SuppressWarnings("unchecked") public AbstractPluginManager(String pluginCategory) { + this(pluginCategory, null, null); + } + + @SuppressWarnings("unchecked") + public AbstractPluginManager(String pluginCategory, ConfigurationManager configurationManager) { + this(pluginCategory, null, configurationManager); + } + + @SuppressWarnings("unchecked") + public AbstractPluginManager( + String pluginCategory, + Configurations serviceConfig, + ConfigurationManager configurationManager) { this.pluginCategory = pluginCategory; + this.serviceConfig = serviceConfig; + this.configurationManager = configurationManager; Type superclass = this.getClass().getGenericSuperclass(); Preconditions.checkArgument( superclass instanceof ParameterizedType, "%s isn't parameterized", superclass); @@ -116,7 +136,32 @@ public void install(String pluginName) { throw new LoadingPluginException( "Cannot find an implement class for the plugin:" + name); } - plugin.open(pluginConfig.getProperties()); + + Map properties = pluginConfig.getProperties(); + Map augmentedProperties = Maps.newHashMap(properties); + + // determine property keys for plugin category and name, configurable via + // AmoroManagementConf.PLUGIN_CATEGORY_PROPERTY_KEY and + // AmoroManagementConf.PLUGIN_NAME_PROPERTY_KEY + String categoryKey = + serviceConfig.getString(AmoroManagementConf.PLUGIN_CATEGORY_PROPERTY_KEY); + String nameKey = serviceConfig.getString(AmoroManagementConf.PLUGIN_NAME_PROPERTY_KEY); + + // expose key names to plugins so that they can resolve category/name in their open() + // methods + augmentedProperties.put( + AmoroManagementConf.PLUGIN_CATEGORY_PROPERTY_KEY.key(), categoryKey); + augmentedProperties.put(AmoroManagementConf.PLUGIN_NAME_PROPERTY_KEY.key(), nameKey); + + // inject identifiers so that plugins can locate their own dynamic configs + augmentedProperties.put(categoryKey, pluginCategory()); + augmentedProperties.put(nameKey, pluginConfig.getName()); + + if (configurationManager != null) { + plugin.open(augmentedProperties, configurationManager); + } else { + plugin.open(augmentedProperties); + } exists.set(false); return plugin; }); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/manager/EventsManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/manager/EventsManager.java index a70eb6a4e1..f43e7ea53c 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/manager/EventsManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/manager/EventsManager.java @@ -18,6 +18,8 @@ package org.apache.amoro.server.manager; +import org.apache.amoro.config.ConfigurationManager; +import org.apache.amoro.config.Configurations; import org.apache.amoro.events.Event; import org.apache.amoro.events.EventListener; @@ -34,10 +36,19 @@ public class EventsManager extends AbstractPluginManager { /** @return Get the singleton object. */ public static EventsManager getInstance() { + return getInstance(null, null); + } + + public static EventsManager getInstance(ConfigurationManager configurationManager) { + return getInstance(null, configurationManager); + } + + public static EventsManager getInstance( + Configurations serviceConfig, ConfigurationManager configurationManager) { if (INSTANCE == null) { synchronized (EventsManager.class) { if (INSTANCE == null) { - INSTANCE = new EventsManager(); + INSTANCE = new EventsManager(serviceConfig, configurationManager); INSTANCE.initialize(); } } @@ -58,7 +69,15 @@ public static void dispose() { private Executor pluginVisitorPool; public EventsManager() { - super(PLUGIN_TYPE); + this(null, null); + } + + public EventsManager(ConfigurationManager configurationManager) { + this(null, configurationManager); + } + + public EventsManager(Configurations serviceConfig, ConfigurationManager configurationManager) { + super(PLUGIN_TYPE, serviceConfig, configurationManager); } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/manager/MetricManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/manager/MetricManager.java index d89b13e593..71822b3e87 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/manager/MetricManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/manager/MetricManager.java @@ -18,6 +18,8 @@ package org.apache.amoro.server.manager; +import org.apache.amoro.config.ConfigurationManager; +import org.apache.amoro.config.Configurations; import org.apache.amoro.metrics.MetricRegisterListener; import org.apache.amoro.metrics.MetricRegistry; import org.apache.amoro.metrics.MetricReporter; @@ -30,10 +32,19 @@ public class MetricManager extends AbstractPluginManager { /** @return Get the singleton object. */ public static MetricManager getInstance() { + return getInstance(null, null); + } + + public static MetricManager getInstance(ConfigurationManager configurationManager) { + return getInstance(null, configurationManager); + } + + public static MetricManager getInstance( + Configurations serviceConfig, ConfigurationManager configurationManager) { if (INSTANCE == null) { synchronized (MetricManager.class) { if (INSTANCE == null) { - INSTANCE = new MetricManager(); + INSTANCE = new MetricManager(serviceConfig, configurationManager); INSTANCE.initialize(); } } @@ -52,7 +63,15 @@ public static void dispose() { } protected MetricManager() { - super(PLUGIN_CATEGORY); + this(null, null); + } + + protected MetricManager(ConfigurationManager configurationManager) { + this(null, configurationManager); + } + + protected MetricManager(Configurations serviceConfig, ConfigurationManager configurationManager) { + super(PLUGIN_CATEGORY, serviceConfig, configurationManager); } private final MetricRegistry globalRegistry = new MetricRegistry(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/DynamicConfigEntry.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/DynamicConfigEntry.java new file mode 100755 index 0000000000..c2ace26b2c --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/DynamicConfigEntry.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.persistence; + +/** + * POJO that maps records in {@code dynamic_conf} table. + * + *

The dynamic configuration table is shared by AMS service level configuration and all plugin + * categories. Records are partitioned by {@code confGroup} and {@code pluginName}: + * + *

    + *
  • Service level overrides: {@code conf_group = 'AMS'}, {@code plugin_name IS NULL}. + *
  • Plugin level overrides: {@code conf_group = 'PLUGIN_' + pluginCategory}, {@code plugin_name + * = pluginName}. + *
+ */ +public class DynamicConfigEntry { + + /** Configuration key. */ + private String confKey; + + /** Configuration value stored as plain string. */ + private String confValue; + + /** Logical configuration group, such as {@code AMS} or {@code PLUGIN_metric-reporters}. */ + private String confGroup; + + /** + * Plugin identifier. Only meaningful when {@code confGroup} starts with {@code PLUGIN_}. For AMS + * service level configuration this field is usually {@code null}. + */ + private String pluginName; + + public String getConfKey() { + return confKey; + } + + public void setConfKey(String confKey) { + this.confKey = confKey; + } + + public String getConfValue() { + return confValue; + } + + public void setConfValue(String confValue) { + this.confValue = confValue; + } + + public String getConfGroup() { + return confGroup; + } + + public void setConfGroup(String confGroup) { + this.confGroup = confGroup; + } + + public String getPluginName() { + return pluginName; + } + + public void setPluginName(String pluginName) { + this.pluginName = pluginName; + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java index 3086bac5e9..a885d282b7 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java @@ -26,6 +26,7 @@ import com.github.pagehelper.dialect.helper.SqlServer2012Dialect; import org.apache.amoro.server.persistence.mapper.ApiTokensMapper; import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper; +import org.apache.amoro.server.persistence.mapper.DynamicConfigMapper; import org.apache.amoro.server.persistence.mapper.HaLeaseMapper; import org.apache.amoro.server.persistence.mapper.OptimizerMapper; import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper; @@ -76,6 +77,7 @@ public void init(DataSource dataSource) throws SQLException { configuration.addMapper(TableProcessMapper.class); configuration.addMapper(TableRuntimeMapper.class); configuration.addMapper(HaLeaseMapper.class); + configuration.addMapper(DynamicConfigMapper.class); PageInterceptor interceptor = new PageInterceptor(); Properties interceptorProperties = new Properties(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/DynamicConfigMapper.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/DynamicConfigMapper.java new file mode 100755 index 0000000000..0b2112ec57 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/DynamicConfigMapper.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.persistence.mapper; + +import org.apache.amoro.server.persistence.DynamicConfigEntry; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Result; +import org.apache.ibatis.annotations.Results; +import org.apache.ibatis.annotations.Select; + +import java.util.List; + +/** Mapper for {@code dynamic_conf} table. */ +public interface DynamicConfigMapper { + + String TABLE_NAME = "dynamic_conf"; + + /** + * Load all AMS service level overrides. + * + *

These are the rows with {@code conf_group = 'AMS'}. {@code plugin_name} is ignored. + */ + @Select( + "SELECT conf_key, conf_value, conf_group, plugin_name FROM " + + TABLE_NAME + + " WHERE conf_group = 'AMS'") + @Results({ + @Result(property = "confKey", column = "conf_key"), + @Result(property = "confValue", column = "conf_value"), + @Result(property = "confGroup", column = "conf_group"), + @Result(property = "pluginName", column = "plugin_name") + }) + List selectServerConfigs(); + + /** + * Load overrides for a specific plugin. + * + * @param confGroup configuration group, usually {@code "PLUGIN_" + pluginCategory} + * @param pluginName plugin identifier + */ + @Select( + "SELECT conf_key, conf_value, conf_group, plugin_name FROM " + + TABLE_NAME + + " WHERE conf_group = #{confGroup} AND plugin_name = #{pluginName}") + @Results({ + @Result(property = "confKey", column = "conf_key"), + @Result(property = "confValue", column = "conf_value"), + @Result(property = "confGroup", column = "conf_group"), + @Result(property = "pluginName", column = "plugin_name") + }) + List selectPluginConfigs( + @Param("confGroup") String confGroup, @Param("pluginName") String pluginName); +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java index ee9f136dc6..5257cac0f5 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java @@ -76,6 +76,16 @@ public ProcessService(Configurations serviceConfig, TableService tableService) { this(serviceConfig, tableService, new ActionCoordinatorManager(), new ExecuteEngineManager()); } + public ProcessService( + org.apache.amoro.config.DynamicConfigurations dynamicConfigurations, + TableService tableService) { + this( + dynamicConfigurations, + tableService, + new ActionCoordinatorManager(), + new ExecuteEngineManager()); + } + public ProcessService( Configurations serviceConfig, TableService tableService, diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java index 65a9061081..b20e59ef16 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java @@ -18,6 +18,8 @@ package org.apache.amoro.server.table; +import org.apache.amoro.config.ConfigurationManager; +import org.apache.amoro.config.Configurations; import org.apache.amoro.server.manager.AbstractPluginManager; import org.apache.amoro.table.TableRuntimeFactory; @@ -25,6 +27,15 @@ public class TableRuntimeFactoryManager extends AbstractPluginManagerThis test uses an in-memory Derby database together with {@link DataSourceFactory} and {@link + * SqlSessionFactoryProvider} so that the real {@code dynamic_conf} table is created and accessed + * via MyBatis. + */ +public class DynamicConfigIntegrationIT { + + private Configurations baseConfig; + private DataSource dataSource; + private DynamicConfigStore store; + + @Before + public void setUp() throws Exception { + // use in-memory Derby and let DataSourceFactory create all AMS tables + baseConfig = new Configurations(); + baseConfig.setString(AmoroManagementConf.DB_TYPE, AmoroManagementConf.DB_TYPE_DERBY); + baseConfig.setString( + AmoroManagementConf.DB_CONNECTION_URL, "jdbc:derby:memory:dynamic_config_it;create=true"); + baseConfig.setBoolean(AmoroManagementConf.DB_AUTO_CREATE_TABLES, true); + + dataSource = DataSourceFactory.createDataSource(baseConfig); + SqlSessionFactoryProvider.getInstance().init(dataSource); + + store = new DynamicConfigStore(); + } + + @After + public void tearDown() throws Exception { + if (dataSource != null) { + try (Connection conn = dataSource.getConnection()) { + // closing the last connection is enough for in-memory Derby + conn.close(); + } + } + } + + /** + * Verify that {@link DynamicConfigStore} can load AMS and plugin level overrides from the {@code + * dynamic_conf} table. + */ + @Test + public void testDynamicConfigStoreLoadOverrides() throws Exception { + try (Connection conn = dataSource.getConnection()) { + // AMS level override + try (PreparedStatement ps = + conn.prepareStatement( + "INSERT INTO dynamic_conf(conf_group, plugin_name, conf_key, conf_value) " + + "VALUES(?, ?, ?, ?)")) { + ps.setString(1, "AMS"); + ps.setString(2, null); + ps.setString(3, "test.ams.key"); + ps.setString(4, "v1"); + ps.executeUpdate(); + } + + // plugin level override + try (PreparedStatement ps = + conn.prepareStatement( + "INSERT INTO dynamic_conf(conf_group, plugin_name, conf_key, conf_value) " + + "VALUES(?, ?, ?, ?)")) { + ps.setString(1, "PLUGIN_metric-reporters"); + ps.setString(2, "test-metric"); + ps.setString(3, "metric.level"); + ps.setString(4, "INFO"); + ps.executeUpdate(); + } + + conn.commit(); + } + + Map serverOverrides = store.loadServerOverrides(); + assertNotNull(serverOverrides); + assertEquals("v1", serverOverrides.get("test.ams.key")); + + Map pluginOverrides = + store.loadPluginOverrides("metric-reporters", "test-metric"); + assertNotNull(pluginOverrides); + assertEquals("INFO", pluginOverrides.get("metric.level")); + } + + /** + * Verify that {@link DbConfigurationManager} refreshes its internal cache when {@code + * dynamic_conf} is updated. + */ + @Test + public void testDbConfigurationManagerRefreshServerOverrides() throws Exception { + DbConfigurationManager manager = new DbConfigurationManager(store, Duration.ofMillis(50L)); + + try { + manager.start(); + + // at the very beginning there should be no overrides + Configurations initial = manager.getServerConfigurations(); + assertNotNull(initial); + assertFalse(initial.containsKey("dynamic.ams.key")); + + // insert an AMS level override directly via JDBC + try (Connection conn = dataSource.getConnection(); + PreparedStatement ps = + conn.prepareStatement( + "INSERT INTO dynamic_conf(conf_group, plugin_name, conf_key, conf_value) " + + "VALUES(?, ?, ?, ?)")) { + ps.setString(1, "AMS"); + ps.setString(2, null); + ps.setString(3, "dynamic.ams.key"); + ps.setString(4, "v1"); + ps.executeUpdate(); + conn.commit(); + } + + // wait for the scheduled refresh to pick up the new value + Thread.sleep(200L); + + Configurations afterInsert = manager.getServerConfigurations(); + assertEquals("v1", afterInsert.toMap().get("dynamic.ams.key")); + + // update the value to v2 + try (Connection conn = dataSource.getConnection(); + PreparedStatement ps = + conn.prepareStatement( + "UPDATE dynamic_conf SET conf_value = ? " + + "WHERE conf_group = ? AND plugin_name IS NULL AND conf_key = ?")) { + ps.setString(1, "v2"); + ps.setString(2, "AMS"); + ps.setString(3, "dynamic.ams.key"); + assertEquals(1, ps.executeUpdate()); + conn.commit(); + } + + // wait for another refresh round + Thread.sleep(200L); + + Configurations afterUpdate = manager.getServerConfigurations(); + assertEquals("v2", afterUpdate.toMap().get("dynamic.ams.key")); + } finally { + manager.stop(); + } + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/ActivePlugin.java b/amoro-common/src/main/java/org/apache/amoro/ActivePlugin.java index a32e65ba5b..bec2f92ca5 100644 --- a/amoro-common/src/main/java/org/apache/amoro/ActivePlugin.java +++ b/amoro-common/src/main/java/org/apache/amoro/ActivePlugin.java @@ -18,6 +18,8 @@ package org.apache.amoro; +import org.apache.amoro.config.ConfigurationManager; + import java.util.Map; /** Plugins that need to initialize and close */ @@ -30,6 +32,19 @@ public interface ActivePlugin extends AmoroPlugin { */ void open(Map properties); + /** + * Initialize and open the plugin with a {@link ConfigurationManager}. + * + *

The default implementation delegates to {@link #open(Map)} to keep existing plugins + * compatible. New plugins can override this method to make use of dynamic configuration. + * + * @param properties plugin properties + * @param configurationManager configuration manager used to fetch dynamic overrides + */ + default void open(Map properties, ConfigurationManager configurationManager) { + open(properties); + } + /** Close the plugin */ void close(); } diff --git a/amoro-common/src/main/java/org/apache/amoro/config/ConfigurationManager.java b/amoro-common/src/main/java/org/apache/amoro/config/ConfigurationManager.java new file mode 100755 index 0000000000..eff2cfbdee --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/config/ConfigurationManager.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.config; + +/** + * Central entry for dynamic configuration. + * + *

The configuration manager is responsible for periodically loading dynamic overrides from the + * backing store (database) and providing them to {@link DynamicConfigurations} instances. All + * {@link DynamicConfigurations} objects must be registered via {@link + * #registerDynamicConfig(DynamicConfigurations)} so that they can be refreshed when the manager + * performs a periodic reload. + */ +public interface ConfigurationManager { + + /** + * Get the latest AMS service level configuration overrides loaded from database. + * + *

The returned {@link Configurations} typically contains only the override key/value pairs + * (not the static base configuration). Callers should treat it as immutable. + */ + Configurations getServerConfigurations(); + + /** + * Get the latest configuration overrides for a specific plugin. + * + * @param pluginCategory plugin category, such as {@code "metric-reporters"} + * @param pluginName plugin configuration name + * @return overrides for the given plugin, never {@code null} + */ + Configurations getPluginConfigurations(String pluginCategory, String pluginName); + + /** Start periodic refresh from the backing store. */ + void start(); + + /** Stop periodic refresh and release internal resources. */ + void stop(); + + /** + * Register a {@link DynamicConfigurations} instance so that it can be refreshed when the manager + * reloads overrides. + */ + void registerDynamicConfig(DynamicConfigurations dyn); +} diff --git a/amoro-common/src/main/java/org/apache/amoro/config/DynamicConfigurations.java b/amoro-common/src/main/java/org/apache/amoro/config/DynamicConfigurations.java new file mode 100755 index 0000000000..c8094965d1 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/config/DynamicConfigurations.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.config; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * DynamicConfigurations represents a configuration view that can be updated at runtime. + * + *

Each instance keeps a reference to the initial static {@link Configurations} (baseConfig) and + * asks a {@link ConfigurationManager} for service level or plugin level overrides during refresh. + * The effective configuration is always + * + *

+ *   currentConfig = merge(baseConfig, overridesFromManager)
+ * 
+ * + * where overrides come from: + * + *
    + *
  • AMS service level: {@link ConfigurationManager#getServerConfigurations()}. + *
  • Plugin level: {@link ConfigurationManager#getPluginConfigurations(String, String)}. + *
+ * + *

This class extends {@link Configurations} so that it can be injected wherever a {@code + * Configurations} is expected. All getter methods defined in {@link Configurations} operate on the + * current merged configuration. + */ +public class DynamicConfigurations extends Configurations { + + public enum Type { + SERVICE, + PLUGIN + } + + private final Configurations baseConfig; + private final ConfigurationManager configurationManager; + private final Type type; + private final String pluginCategory; + private final String pluginName; + + /** + * Create a service level dynamic configuration. + * + * @param baseConfig static AMS service configuration + * @param configurationManager configuration manager used to fetch overrides + */ + public DynamicConfigurations( + Configurations baseConfig, ConfigurationManager configurationManager) { + this(baseConfig, configurationManager, Type.SERVICE, null, null); + } + + /** + * Create a plugin level dynamic configuration. + * + * @param baseConfig static plugin configuration + * @param configurationManager configuration manager used to fetch overrides + * @param pluginCategory plugin category, such as {@code "metric-reporters"} + * @param pluginName plugin configuration name + */ + public DynamicConfigurations( + Configurations baseConfig, + ConfigurationManager configurationManager, + String pluginCategory, + String pluginName) { + this(baseConfig, configurationManager, Type.PLUGIN, pluginCategory, pluginName); + } + + private DynamicConfigurations( + Configurations baseConfig, + ConfigurationManager configurationManager, + Type type, + String pluginCategory, + String pluginName) { + super(baseConfig); + this.baseConfig = baseConfig.clone(); + this.configurationManager = configurationManager; + this.type = type; + this.pluginCategory = pluginCategory; + this.pluginName = pluginName; + if (this.configurationManager != null) { + this.configurationManager.registerDynamicConfig(this); + // build initial effective configuration from current overrides + refreshFromManager(); + } + } + + /** Whether this instance represents plugin level configuration. */ + public boolean isPluginLevel() { + return type == Type.PLUGIN; + } + + /** Plugin category, meaningful only for plugin level configurations. */ + public String getPluginCategory() { + return pluginCategory; + } + + /** Plugin name, meaningful only for plugin level configurations. */ + public String getPluginName() { + return pluginName; + } + + /** + * Refresh the current configuration from the underlying {@link ConfigurationManager}. + * + *

This method is intended to be called by {@link ConfigurationManager} on a periodic basis. + */ + public void refreshFromManager() { + if (configurationManager == null) { + return; + } + Configurations overrides; + if (type == Type.SERVICE) { + overrides = configurationManager.getServerConfigurations(); + } else { + overrides = configurationManager.getPluginConfigurations(pluginCategory, pluginName); + } + if (overrides == null) { + overrides = new Configurations(); + } + + Map merged = new HashMap<>(); + synchronized (baseConfig.confData) { + merged.putAll(baseConfig.confData); + } + synchronized (overrides.confData) { + merged.putAll(overrides.confData); + } + + synchronized (this.confData) { + this.confData.clear(); + this.confData.putAll(merged); + } + } + + /** Returns a snapshot of current configuration as a standalone {@link Configurations}. */ + public Configurations snapshot() { + return this.clone(); + } + + // Convenience delegate methods mirroring Configurations API ------------------------------- + + public T get(ConfigOption option) { + return super.get(option); + } + + public String getString(ConfigOption option) { + return super.getString(option); + } + + public int getInteger(ConfigOption option) { + return super.getInteger(option); + } + + public long getLong(ConfigOption option) { + return super.getLong(option); + } + + public boolean getBoolean(ConfigOption option) { + return super.getBoolean(option); + } + + public long getDurationInMillis(ConfigOption option) { + Duration duration = super.get(option); + return duration.toMillis(); + } + + public String getValue(ConfigOption option) { + return super.getValue(option); + } + + public > T getEnum(Class enumClass, ConfigOption option) { + return super.getEnum(enumClass, option); + } + + public Optional getOptional(ConfigOption option) { + return super.getOptional(option); + } +}