Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,33 @@

package org.apache.amoro.server.optimizing.maintainer;

import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.maintainer.MaintainerMetrics;
import org.apache.amoro.maintainer.OptimizingInfo;
import org.apache.amoro.maintainer.TableMaintainerContext;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics;
import org.apache.amoro.server.utils.HiveLocationUtil;
import org.apache.amoro.table.MixedTable;

import java.util.Collections;
import java.util.Set;

/**
* Default implementation of TableMaintainerContext for AMS. Adapts DefaultTableRuntime to
* Default implementation of TableMaintainerContext for AMS. Adapts TableRuntime to
* TableMaintainerContext interface.
*/
public class DefaultTableMaintainerContext implements TableMaintainerContext {

private final DefaultTableRuntime tableRuntime;
private final TableRuntime tableRuntime;
private final MixedTable mixedTable;

public DefaultTableMaintainerContext(DefaultTableRuntime tableRuntime) {
public DefaultTableMaintainerContext(TableRuntime tableRuntime) {
this.tableRuntime = tableRuntime;
this.mixedTable = null;
}

public DefaultTableMaintainerContext(DefaultTableRuntime tableRuntime, MixedTable mixedTable) {
public DefaultTableMaintainerContext(TableRuntime tableRuntime, MixedTable mixedTable) {
this.tableRuntime = tableRuntime;
this.mixedTable = mixedTable;
}
Expand All @@ -56,23 +56,21 @@ public TableConfiguration getTableConfiguration() {

@Override
public MaintainerMetrics getMetrics() {
TableOrphanFilesCleaningMetrics metrics = tableRuntime.getOrphanFilesCleaningMetrics();
return new MaintainerMetrics() {
@Override
public void recordOrphanDataFilesCleaned(int expected, int cleaned) {
metrics.completeOrphanDataFiles(expected, cleaned);
}

@Override
public void recordOrphanMetadataFilesCleaned(int expected, int cleaned) {
metrics.completeOrphanMetadataFiles(expected, cleaned);
}
};
// Return the full TableMaintainerMetricsImpl directly
// This provides access to all maintainer metrics including orphan files cleaning,
// dangling delete files cleaning, snapshot expiration, data expiration, tag creation,
// and partition expiration.
return tableRuntime.getMaintainerMetrics();
}

@Override
public OptimizingInfo getOptimizingInfo() {
return new DefaultOptimizingInfo(tableRuntime);
// For AMS DefaultTableRuntime, provide full optimizing info.
// For other TableRuntime implementations, return empty info.
if (tableRuntime instanceof DefaultTableRuntime) {
return new DefaultOptimizingInfo((DefaultTableRuntime) tableRuntime);
}
return OptimizingInfo.EMPTY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer;
import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer;
import org.apache.amoro.maintainer.TableMaintainer;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.table.MixedTable;
import org.apache.iceberg.Table;

Expand All @@ -36,11 +34,11 @@ public class TableMaintainerFactory {
* Create an Iceberg table maintainer with AMS context.
*
* @param table the Iceberg table
* @param tableRuntime the AMS table runtime
* @param tableRuntime the table runtime
* @return IcebergTableMaintainer instance
*/
public static IcebergTableMaintainer createIcebergMaintainer(
Table table, DefaultTableRuntime tableRuntime) {
Table table, TableRuntime tableRuntime) {
return new IcebergTableMaintainer(
table,
tableRuntime.getTableIdentifier().getIdentifier(),
Expand All @@ -55,19 +53,17 @@ public static IcebergTableMaintainer createIcebergMaintainer(
* @return TableMaintainer instance
*/
public static TableMaintainer create(AmoroTable<?> amoroTable, TableRuntime tableRuntime) {
Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
DefaultTableRuntime runtime = (DefaultTableRuntime) tableRuntime;
TableFormat format = amoroTable.format();

if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
MixedTable mixedTable = (MixedTable) amoroTable.originalTable();
return new MixedTableMaintainer(
mixedTable, new DefaultTableMaintainerContext(runtime, mixedTable));
mixedTable, new DefaultTableMaintainerContext(tableRuntime, mixedTable));
} else if (TableFormat.ICEBERG.equals(format)) {
return new IcebergTableMaintainer(
(Table) amoroTable.originalTable(),
amoroTable.id(),
new DefaultTableMaintainerContext(runtime));
new DefaultTableMaintainerContext(tableRuntime));
} else {
throw new RuntimeException("Unsupported table type" + amoroTable.originalTable().getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,8 @@
package org.apache.amoro.server.optimizing.maintainer;

import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer;
import org.apache.amoro.maintainer.TableMaintainer;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.table.MixedTable;
import org.apache.iceberg.Table;

/** Factory for creating {@link TableMaintainer}. */
@Deprecated
Expand All @@ -40,24 +35,4 @@ public class TableMaintainers {
public static TableMaintainer create(AmoroTable<?> amoroTable, TableRuntime tableRuntime) {
return TableMaintainerFactory.create(amoroTable, tableRuntime);
}

/**
* Create a {@link TableMaintainer} for the given table with DefaultTableRuntime.
*
* @deprecated since 0.9.0, will be removed in 0.10.0. Use {@link
* TableMaintainerFactory#createIcebergMaintainer(Table, DefaultTableRuntime)} instead.
*/
public static TableMaintainer create(AmoroTable<?> amoroTable, DefaultTableRuntime tableRuntime) {
TableFormat format = amoroTable.format();
if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
MixedTable mixedTable = (MixedTable) amoroTable.originalTable();
return new MixedTableMaintainer(
mixedTable, new DefaultTableMaintainerContext(tableRuntime, mixedTable));
} else if (TableFormat.ICEBERG.equals(format)) {
return TableMaintainerFactory.createIcebergMaintainer(
(Table) amoroTable.originalTable(), tableRuntime);
} else {
throw new RuntimeException("Unsupported table type" + amoroTable.originalTable().getClass());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ protected AbstractTableMetrics(ServerTableIdentifier identifier) {
this.identifier = identifier;
}

/**
* Get the table identifier.
*
* @return ServerTableIdentifier
*/
public ServerTableIdentifier getIdentifier() {
return identifier;
}

protected void registerMetric(MetricRegistry registry, MetricDefine define, Metric metric) {
MetricKey key =
registry.register(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.iceberg.Constants;
import org.apache.amoro.maintainer.MaintainerMetrics;
import org.apache.amoro.metrics.MetricRegistry;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.optimizing.TableRuntimeOptimizingState;
Expand Down Expand Up @@ -97,7 +98,7 @@ public class DefaultTableRuntime extends AbstractTableRuntime

private final Map<Action, TableProcessContainer> processContainerMap = Maps.newConcurrentMap();
private final TableOptimizingMetrics optimizingMetrics;
private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics;
private final TableMaintainerMetrics maintainerMetrics;
private final TableSummaryMetrics tableSummaryMetrics;
private volatile long lastPlanTime;
private volatile OptimizingProcess optimizingProcess;
Expand All @@ -107,8 +108,7 @@ public DefaultTableRuntime(TableRuntimeStore store) {
super(store);
this.optimizingMetrics =
new TableOptimizingMetrics(store.getTableIdentifier(), store.getGroupName());
this.orphanFilesCleaningMetrics =
new TableOrphanFilesCleaningMetrics(store.getTableIdentifier());
this.maintainerMetrics = new TableMaintainerMetrics(store.getTableIdentifier());
this.tableSummaryMetrics = new TableSummaryMetrics(store.getTableIdentifier());
}

Expand All @@ -124,7 +124,7 @@ public void recover(OptimizingProcess optimizingProcess) {
public void registerMetric(MetricRegistry metricRegistry) {
// TODO: extract method to interface.
this.optimizingMetrics.register(metricRegistry);
this.orphanFilesCleaningMetrics.register(metricRegistry);
this.maintainerMetrics.register(metricRegistry);
this.tableSummaryMetrics.register(metricRegistry);
}

Expand Down Expand Up @@ -161,8 +161,14 @@ public List<TableProcessStore> getProcessStates(Action action) {
return processContainerMap.get(action).getProcessStates();
}

public TableOrphanFilesCleaningMetrics getOrphanFilesCleaningMetrics() {
return orphanFilesCleaningMetrics;
/**
* Get the maintainer metrics implementation.
*
* @return MaintainerMetrics instance
*/
@Override
public MaintainerMetrics getMaintainerMetrics() {
return maintainerMetrics;
}

public long getCurrentSnapshotId() {
Expand Down Expand Up @@ -472,7 +478,7 @@ public void beginCommitting() {
@Override
public void unregisterMetric() {
tableSummaryMetrics.unregister();
orphanFilesCleaningMetrics.unregister();
maintainerMetrics.unregister();
optimizingMetrics.unregister();
}

Expand Down
Loading