Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,41 @@ public class AmoroManagementConf {
.defaultValue(30000L)
.withDescription("Max wait time before getting a connection timeout.");

public static final ConfigOption<Boolean> DYNAMIC_CONFIG_ENABLED =
ConfigOptions.key("dynamic-config.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable dynamic configuration backed by database table `dynamic_conf`.");

public static final ConfigOption<Duration> DYNAMIC_CONFIG_REFRESH_INTERVAL =
ConfigOptions.key("dynamic-config.refresh-interval")
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"Refresh interval for reloading dynamic configuration overrides from database.");

public static final ConfigOption<String> DYNAMIC_CONFIG_NAMESPACE =
ConfigOptions.key("dynamic-config.namespace")
.stringType()
.defaultValue("AMS")
.withDescription(
"Logical namespace used when loading dynamic configuration overrides for AMS.");

public static final ConfigOption<String> PLUGIN_CATEGORY_PROPERTY_KEY =
ConfigOptions.key("plugin.property.category-key")
.stringType()
.defaultValue("plugin.category")
.withDescription(
"The property key used to store plugin category identifier in plugin properties.");

public static final ConfigOption<String> PLUGIN_NAME_PROPERTY_KEY =
ConfigOptions.key("plugin.property.name-key")
.stringType()
.defaultValue("plugin.name")
.withDescription(
"The property key used to store plugin name identifier in plugin properties.");

public static final ConfigOption<Duration> OPTIMIZER_HB_TIMEOUT =
ConfigOptions.key("optimizer.heart-beat-timeout")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@
import org.apache.amoro.api.OptimizingService;
import org.apache.amoro.config.ConfigHelpers;
import org.apache.amoro.config.ConfigurationException;
import org.apache.amoro.config.ConfigurationManager;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.DynamicConfigurations;
import org.apache.amoro.config.shade.utils.ConfigShadeUtils;
import org.apache.amoro.exception.AmoroRuntimeException;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.DefaultCatalogManager;
import org.apache.amoro.server.config.DbConfigurationManager;
import org.apache.amoro.server.config.DynamicConfigStore;
import org.apache.amoro.server.dashboard.DashboardServer;
import org.apache.amoro.server.dashboard.JavalinJsonMapper;
import org.apache.amoro.server.dashboard.response.ErrorResponse;
Expand Down Expand Up @@ -114,6 +118,8 @@ public class AmoroServiceContainer {
private ProcessService processService;
private TerminalManager terminalManager;
private Configurations serviceConfig;
private ConfigurationManager configurationManager;
private DynamicConfigurations dynamicConfigurations;
private TServer tableManagementServer;
private TServer optimizingServiceServer;
private Javalin httpServer;
Expand Down Expand Up @@ -199,13 +205,17 @@ public void waitFollowerShip() throws Exception {
}

public void startRestServices() throws Exception {
EventsManager.getInstance();
MetricManager.getInstance();
EventsManager.getInstance(serviceConfig, configurationManager);
MetricManager.getInstance(serviceConfig, configurationManager);

catalogManager = new DefaultCatalogManager(serviceConfig);
tableManager = new DefaultTableManager(serviceConfig, catalogManager);
optimizerManager = new DefaultOptimizerManager(serviceConfig, catalogManager);
terminalManager = new TerminalManager(serviceConfig, catalogManager);
if (dynamicConfigurations != null) {
terminalManager = new TerminalManager(dynamicConfigurations, catalogManager);
} else {
terminalManager = new TerminalManager(serviceConfig, catalogManager);
}

initHttpService();
startHttpService();
Expand All @@ -229,16 +239,26 @@ public void transitionToFollower() {
}

public void startOptimizingService() throws Exception {
TableRuntimeFactoryManager tableRuntimeFactoryManager = new TableRuntimeFactoryManager();
TableRuntimeFactoryManager tableRuntimeFactoryManager =
new TableRuntimeFactoryManager(serviceConfig, configurationManager);
tableRuntimeFactoryManager.initialize();

tableService =
new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager);

optimizingService =
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);

processService = new ProcessService(serviceConfig, tableService);
if (dynamicConfigurations != null) {
tableService =
new DefaultTableService(
dynamicConfigurations, catalogManager, tableRuntimeFactoryManager);
optimizingService =
new DefaultOptimizingService(
dynamicConfigurations, catalogManager, optimizerManager, tableService);
processService = new ProcessService(dynamicConfigurations, tableService);
} else {
tableService =
new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager);
optimizingService =
new DefaultOptimizingService(
serviceConfig, catalogManager, optimizerManager, tableService);
processService = new ProcessService(serviceConfig, tableService);
}

LOG.info("Setting up AMS table executors...");
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
Expand Down Expand Up @@ -319,6 +339,11 @@ public void disposeRestService() {
public void dispose() {
disposeOptimizingService();
disposeRestService();
if (configurationManager != null) {
configurationManager.stop();
configurationManager = null;
dynamicConfigurations = null;
}
}

private void initConfig() throws Exception {
Expand Down Expand Up @@ -528,6 +553,7 @@ private class ConfigurationHelper {
public void init() throws Exception {
Map<String, Object> envConfig = initEnvConfig();
initServiceConfig(envConfig);
initDynamicConfiguration();
setIcebergSystemProperties();
initContainerConfig();
}
Expand Down Expand Up @@ -557,6 +583,24 @@ private void initServiceConfig(Map<String, Object> envConfig) throws Exception {
SqlSessionFactoryProvider.getInstance().init(dataSource);
}

private void initDynamicConfiguration() {
boolean dynamicEnabled = serviceConfig.getBoolean(AmoroManagementConf.DYNAMIC_CONFIG_ENABLED);
if (!dynamicEnabled) {
LOG.info(
"Dynamic configuration is disabled by {}",
AmoroManagementConf.DYNAMIC_CONFIG_ENABLED.key());
return;
}
java.time.Duration refreshInterval =
serviceConfig.get(AmoroManagementConf.DYNAMIC_CONFIG_REFRESH_INTERVAL);
configurationManager = new DbConfigurationManager(new DynamicConfigStore(), refreshInterval);
configurationManager.start();
dynamicConfigurations = new DynamicConfigurations(serviceConfig, configurationManager);
LOG.info(
"Dynamic configuration enabled for AMS service with refresh interval {} ms",
refreshInterval.toMillis());
}

private Map<String, Object> initEnvConfig() {
LOG.info("initializing system env configuration...");
Map<String, String> envs = System.getenv();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.config;

import org.apache.amoro.config.ConfigurationManager;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.DynamicConfigurations;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
* Default {@link ConfigurationManager} implementation backed by {@link DynamicConfigStore}.
*
* <p>This manager is responsible for:
*
* <ul>
* <li>periodically reloading AMS service level overrides,
* <li>periodically reloading plugin level overrides for registered plugins,
* <li>notifying all registered {@link DynamicConfigurations} instances to refresh themselves.
* </ul>
*
* <p>Merge semantics are implemented in {@link DynamicConfigurations}: for service level
* configuration, {@code currentConfig = merge(baseConfig, getServerConfigurations())}; for plugin
* level configuration, {@code currentConfig = merge(baseConfig,
* getPluginConfigurations(pluginCategory, pluginName))}.
*/
public class DbConfigurationManager implements ConfigurationManager {

private static final Logger LOG = LoggerFactory.getLogger(DbConfigurationManager.class);

private final DynamicConfigStore store;
private final ScheduledExecutorService scheduler;
private final long refreshIntervalMillis;

private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false);

private final AtomicReference<Configurations> serverOverrides =
new AtomicReference<>(new Configurations());

private final ConcurrentMap<PluginKey, AtomicReference<Configurations>> pluginOverrides =
new ConcurrentHashMap<>();

private final CopyOnWriteArrayList<DynamicConfigurations> dynamicConfigs =
new CopyOnWriteArrayList<>();

public DbConfigurationManager(DynamicConfigStore store, Duration refreshInterval) {
Preconditions.checkNotNull(store, "store must not be null");
Preconditions.checkNotNull(refreshInterval, "refreshInterval must not be null");
this.store = store;
this.refreshIntervalMillis = Math.max(1L, refreshInterval.toMillis());
this.scheduler =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("db-configuration-manager-%d")
.build());
}

@Override
public Configurations getServerConfigurations() {
Configurations overrides = serverOverrides.get();
return overrides != null ? overrides : new Configurations();
}

@Override
public Configurations getPluginConfigurations(String pluginCategory, String pluginName) {
if (pluginCategory == null) {
return new Configurations();
}
PluginKey key = new PluginKey(pluginCategory.trim(), pluginName);
AtomicReference<Configurations> ref = pluginOverrides.get(key);
return ref != null && ref.get() != null ? ref.get() : new Configurations();
}

@Override
public void start() {
if (!started.compareAndSet(false, true)) {
return;
}
LOG.info("Starting DbConfigurationManager with refresh interval {} ms", refreshIntervalMillis);
// initial load before scheduling periodic refresh
safeRefresh();
scheduler.scheduleWithFixedDelay(
this::safeRefresh, refreshIntervalMillis, refreshIntervalMillis, TimeUnit.MILLISECONDS);
}

@Override
public void stop() {
if (!stopped.compareAndSet(false, true)) {
return;
}
LOG.info("Stopping DbConfigurationManager");
scheduler.shutdownNow();
dynamicConfigs.clear();
pluginOverrides.clear();
}

@Override
public void registerDynamicConfig(DynamicConfigurations dyn) {
if (dyn == null) {
return;
}
dynamicConfigs.add(dyn);
}

private void safeRefresh() {
if (stopped.get()) {
return;
}
try {
refreshInternal();
} catch (Throwable t) {
LOG.warn("Failed to refresh dynamic configuration overrides", t);
}
}

private void refreshInternal() {
// 1) reload AMS service level overrides
Map<String, String> serverMap = store.loadServerOverrides();
serverOverrides.set(Configurations.fromMap(serverMap));

// 2) reload plugin level overrides for all registered plugin dynamic configurations
Map<PluginKey, Configurations> latestPluginOverrides = new HashMap<>();
for (DynamicConfigurations dyn : dynamicConfigs) {
if (!dyn.isPluginLevel()) {
continue;
}
PluginKey key = new PluginKey(dyn.getPluginCategory(), dyn.getPluginName());
if (latestPluginOverrides.containsKey(key)) {
continue;
}
Map<String, String> overridesMap =
store.loadPluginOverrides(key.pluginCategory, key.pluginName);
latestPluginOverrides.put(key, Configurations.fromMap(overridesMap));
}

for (Map.Entry<PluginKey, Configurations> entry : latestPluginOverrides.entrySet()) {
pluginOverrides
.computeIfAbsent(entry.getKey(), k -> new AtomicReference<>(new Configurations()))
.set(entry.getValue());
}

// 3) refresh all registered DynamicConfigurations instances
for (DynamicConfigurations dyn : dynamicConfigs) {
try {
dyn.refreshFromManager();
} catch (Throwable t) {
LOG.warn(
"Failed to refresh DynamicConfigurations for pluginCategory={}, pluginName={}",
dyn.getPluginCategory(),
dyn.getPluginName(),
t);
}
}
}

private static final class PluginKey {
private final String pluginCategory;
private final String pluginName;

private PluginKey(String pluginCategory, String pluginName) {
this.pluginCategory = pluginCategory;
this.pluginName = pluginName == null ? null : pluginName.trim();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PluginKey pluginKey = (PluginKey) o;
return Objects.equals(pluginCategory, pluginKey.pluginCategory)
&& Objects.equals(pluginName, pluginKey.pluginName);
}

@Override
public int hashCode() {
return Objects.hash(pluginCategory, pluginName);
}
}
}
Loading
Loading