diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/DefaultTableMaintainerContext.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/DefaultTableMaintainerContext.java index ca47e22522..dc855edf9d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/DefaultTableMaintainerContext.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/DefaultTableMaintainerContext.java @@ -18,12 +18,12 @@ package org.apache.amoro.server.optimizing.maintainer; +import org.apache.amoro.TableRuntime; import org.apache.amoro.config.TableConfiguration; import org.apache.amoro.maintainer.MaintainerMetrics; import org.apache.amoro.maintainer.OptimizingInfo; import org.apache.amoro.maintainer.TableMaintainerContext; import org.apache.amoro.server.table.DefaultTableRuntime; -import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics; import org.apache.amoro.server.utils.HiveLocationUtil; import org.apache.amoro.table.MixedTable; @@ -31,20 +31,20 @@ import java.util.Set; /** - * Default implementation of TableMaintainerContext for AMS. Adapts DefaultTableRuntime to + * Default implementation of TableMaintainerContext for AMS. Adapts TableRuntime to * TableMaintainerContext interface. */ public class DefaultTableMaintainerContext implements TableMaintainerContext { - private final DefaultTableRuntime tableRuntime; + private final TableRuntime tableRuntime; private final MixedTable mixedTable; - public DefaultTableMaintainerContext(DefaultTableRuntime tableRuntime) { + public DefaultTableMaintainerContext(TableRuntime tableRuntime) { this.tableRuntime = tableRuntime; this.mixedTable = null; } - public DefaultTableMaintainerContext(DefaultTableRuntime tableRuntime, MixedTable mixedTable) { + public DefaultTableMaintainerContext(TableRuntime tableRuntime, MixedTable mixedTable) { this.tableRuntime = tableRuntime; this.mixedTable = mixedTable; } @@ -56,23 +56,21 @@ public TableConfiguration getTableConfiguration() { @Override public MaintainerMetrics getMetrics() { - TableOrphanFilesCleaningMetrics metrics = tableRuntime.getOrphanFilesCleaningMetrics(); - return new MaintainerMetrics() { - @Override - public void recordOrphanDataFilesCleaned(int expected, int cleaned) { - metrics.completeOrphanDataFiles(expected, cleaned); - } - - @Override - public void recordOrphanMetadataFilesCleaned(int expected, int cleaned) { - metrics.completeOrphanMetadataFiles(expected, cleaned); - } - }; + // Return the full TableMaintainerMetricsImpl directly + // This provides access to all maintainer metrics including orphan files cleaning, + // dangling delete files cleaning, snapshot expiration, data expiration, tag creation, + // and partition expiration. + return tableRuntime.getMaintainerMetrics(); } @Override public OptimizingInfo getOptimizingInfo() { - return new DefaultOptimizingInfo(tableRuntime); + // For AMS DefaultTableRuntime, provide full optimizing info. + // For other TableRuntime implementations, return empty info. + if (tableRuntime instanceof DefaultTableRuntime) { + return new DefaultOptimizingInfo((DefaultTableRuntime) tableRuntime); + } + return OptimizingInfo.EMPTY; } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainerFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainerFactory.java index ec1583fa1d..9c3acd9ef9 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainerFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainerFactory.java @@ -24,8 +24,6 @@ import org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer; import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer; import org.apache.amoro.maintainer.TableMaintainer; -import org.apache.amoro.server.table.DefaultTableRuntime; -import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.table.MixedTable; import org.apache.iceberg.Table; @@ -36,11 +34,11 @@ public class TableMaintainerFactory { * Create an Iceberg table maintainer with AMS context. * * @param table the Iceberg table - * @param tableRuntime the AMS table runtime + * @param tableRuntime the table runtime * @return IcebergTableMaintainer instance */ public static IcebergTableMaintainer createIcebergMaintainer( - Table table, DefaultTableRuntime tableRuntime) { + Table table, TableRuntime tableRuntime) { return new IcebergTableMaintainer( table, tableRuntime.getTableIdentifier().getIdentifier(), @@ -55,19 +53,17 @@ public static IcebergTableMaintainer createIcebergMaintainer( * @return TableMaintainer instance */ public static TableMaintainer create(AmoroTable amoroTable, TableRuntime tableRuntime) { - Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime); - DefaultTableRuntime runtime = (DefaultTableRuntime) tableRuntime; TableFormat format = amoroTable.format(); if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) { MixedTable mixedTable = (MixedTable) amoroTable.originalTable(); return new MixedTableMaintainer( - mixedTable, new DefaultTableMaintainerContext(runtime, mixedTable)); + mixedTable, new DefaultTableMaintainerContext(tableRuntime, mixedTable)); } else if (TableFormat.ICEBERG.equals(format)) { return new IcebergTableMaintainer( (Table) amoroTable.originalTable(), amoroTable.id(), - new DefaultTableMaintainerContext(runtime)); + new DefaultTableMaintainerContext(tableRuntime)); } else { throw new RuntimeException("Unsupported table type" + amoroTable.originalTable().getClass()); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java index eb152bb096..29ef43d457 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java @@ -19,13 +19,8 @@ package org.apache.amoro.server.optimizing.maintainer; import org.apache.amoro.AmoroTable; -import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; -import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer; import org.apache.amoro.maintainer.TableMaintainer; -import org.apache.amoro.server.table.DefaultTableRuntime; -import org.apache.amoro.table.MixedTable; -import org.apache.iceberg.Table; /** Factory for creating {@link TableMaintainer}. */ @Deprecated @@ -40,24 +35,4 @@ public class TableMaintainers { public static TableMaintainer create(AmoroTable amoroTable, TableRuntime tableRuntime) { return TableMaintainerFactory.create(amoroTable, tableRuntime); } - - /** - * Create a {@link TableMaintainer} for the given table with DefaultTableRuntime. - * - * @deprecated since 0.9.0, will be removed in 0.10.0. Use {@link - * TableMaintainerFactory#createIcebergMaintainer(Table, DefaultTableRuntime)} instead. - */ - public static TableMaintainer create(AmoroTable amoroTable, DefaultTableRuntime tableRuntime) { - TableFormat format = amoroTable.format(); - if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) { - MixedTable mixedTable = (MixedTable) amoroTable.originalTable(); - return new MixedTableMaintainer( - mixedTable, new DefaultTableMaintainerContext(tableRuntime, mixedTable)); - } else if (TableFormat.ICEBERG.equals(format)) { - return TableMaintainerFactory.createIcebergMaintainer( - (Table) amoroTable.originalTable(), tableRuntime); - } else { - throw new RuntimeException("Unsupported table type" + amoroTable.originalTable().getClass()); - } - } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableMetrics.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableMetrics.java index 74e52f0b7e..c1a14adb08 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableMetrics.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableMetrics.java @@ -37,6 +37,15 @@ protected AbstractTableMetrics(ServerTableIdentifier identifier) { this.identifier = identifier; } + /** + * Get the table identifier. + * + * @return ServerTableIdentifier + */ + public ServerTableIdentifier getIdentifier() { + return identifier; + } + protected void registerMetric(MetricRegistry registry, MetricDefine define, Metric metric) { MetricKey key = registry.register( diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java index c1506218d2..154c86b6d6 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java @@ -26,6 +26,7 @@ import org.apache.amoro.config.OptimizingConfig; import org.apache.amoro.config.TableConfiguration; import org.apache.amoro.iceberg.Constants; +import org.apache.amoro.maintainer.MaintainerMetrics; import org.apache.amoro.metrics.MetricRegistry; import org.apache.amoro.optimizing.OptimizingType; import org.apache.amoro.optimizing.TableRuntimeOptimizingState; @@ -97,7 +98,7 @@ public class DefaultTableRuntime extends AbstractTableRuntime private final Map processContainerMap = Maps.newConcurrentMap(); private final TableOptimizingMetrics optimizingMetrics; - private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics; + private final TableMaintainerMetrics maintainerMetrics; private final TableSummaryMetrics tableSummaryMetrics; private volatile long lastPlanTime; private volatile OptimizingProcess optimizingProcess; @@ -107,8 +108,7 @@ public DefaultTableRuntime(TableRuntimeStore store) { super(store); this.optimizingMetrics = new TableOptimizingMetrics(store.getTableIdentifier(), store.getGroupName()); - this.orphanFilesCleaningMetrics = - new TableOrphanFilesCleaningMetrics(store.getTableIdentifier()); + this.maintainerMetrics = new TableMaintainerMetrics(store.getTableIdentifier()); this.tableSummaryMetrics = new TableSummaryMetrics(store.getTableIdentifier()); } @@ -124,7 +124,7 @@ public void recover(OptimizingProcess optimizingProcess) { public void registerMetric(MetricRegistry metricRegistry) { // TODO: extract method to interface. this.optimizingMetrics.register(metricRegistry); - this.orphanFilesCleaningMetrics.register(metricRegistry); + this.maintainerMetrics.register(metricRegistry); this.tableSummaryMetrics.register(metricRegistry); } @@ -161,8 +161,14 @@ public List getProcessStates(Action action) { return processContainerMap.get(action).getProcessStates(); } - public TableOrphanFilesCleaningMetrics getOrphanFilesCleaningMetrics() { - return orphanFilesCleaningMetrics; + /** + * Get the maintainer metrics implementation. + * + * @return MaintainerMetrics instance + */ + @Override + public MaintainerMetrics getMaintainerMetrics() { + return maintainerMetrics; } public long getCurrentSnapshotId() { @@ -472,7 +478,7 @@ public void beginCommitting() { @Override public void unregisterMetric() { tableSummaryMetrics.unregister(); - orphanFilesCleaningMetrics.unregister(); + maintainerMetrics.unregister(); optimizingMetrics.unregister(); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableMaintainerMetrics.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableMaintainerMetrics.java new file mode 100644 index 0000000000..b4c8b8df79 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableMaintainerMetrics.java @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table; + +import static org.apache.amoro.metrics.MetricDefine.defineCounter; +import static org.apache.amoro.metrics.MetricDefine.defineGauge; + +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.maintainer.MaintainerMetrics; +import org.apache.amoro.maintainer.MaintainerOperationType; +import org.apache.amoro.metrics.Counter; +import org.apache.amoro.metrics.Gauge; +import org.apache.amoro.metrics.Metric; +import org.apache.amoro.metrics.MetricDefine; +import org.apache.amoro.metrics.MetricKey; +import org.apache.amoro.metrics.MetricRegistry; +import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Table maintenance operation metrics implementation. + * + *

This class handles metrics recording for table maintenance operations such as orphan file + * cleaning, snapshot expiration, data expiration, and tag creation. + * + *

Design notes: + * + *

+ */ +public class TableMaintainerMetrics implements MaintainerMetrics { + + // ========== Orphan Files Related MetricDefine ========== + + /** + * Count of orphan content files cleaned. + * + *

Note: This metric name is retained for backward compatibility. The "content" terminology + * refers to data files (as opposed to metadata files). + */ + public static final MetricDefine TABLE_ORPHAN_CONTENT_FILE_CLEANING_COUNT = + defineCounter("table_orphan_content_file_cleaning_count") + .withDescription("Count of orphan content files cleaned") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** + * Expected count of orphan content files to clean. + * + *

Note: This metric name is retained for backward compatibility. The "content" terminology + * refers to data files (as opposed to metadata files). + */ + public static final MetricDefine TABLE_EXPECTED_ORPHAN_CONTENT_FILE_CLEANING_COUNT = + defineCounter("table_expected_orphan_content_file_cleaning_count") + .withDescription("Expected count of orphan content files to clean") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Count of orphan metadata files cleaned */ + public static final MetricDefine TABLE_ORPHAN_METADATA_FILES_CLEANED_COUNT = + defineCounter("table_orphan_metadata_files_cleaned_count") + .withDescription("Count of orphan metadata files cleaned") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Expected count of orphan metadata files to clean */ + public static final MetricDefine TABLE_ORPHAN_METADATA_FILES_CLEANED_EXPECTED_COUNT = + defineCounter("table_orphan_metadata_files_cleaned_expected_count") + .withDescription("Expected count of orphan metadata files to clean") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Duration of orphan files cleaning operation (milliseconds) */ + public static final MetricDefine TABLE_ORPHAN_FILES_CLEANING_DURATION = + defineGauge("table_orphan_files_cleaning_duration_millis") + .withDescription("Duration of orphan files cleaning operation in milliseconds") + .withTags("catalog", "database", "table", "table_format") + .build(); + + // ========== Dangling Delete Files Related MetricDefine (Iceberg) ========== + + /** Count of dangling delete files cleaned */ + public static final MetricDefine TABLE_DANGLING_DELETE_FILES_CLEANED_COUNT = + defineCounter("table_dangling_delete_files_cleaned_count") + .withDescription("Count of dangling delete files cleaned") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Duration of dangling delete files cleaning operation (milliseconds) */ + public static final MetricDefine TABLE_DANGLING_DELETE_FILES_CLEANING_DURATION = + defineGauge("table_dangling_delete_files_cleaning_duration_millis") + .withDescription("Duration of dangling delete files cleaning operation in milliseconds") + .withTags("catalog", "database", "table", "table_format") + .build(); + + // ========== Snapshot Expiration Related MetricDefine ========== + + /** Count of snapshots expired */ + public static final MetricDefine TABLE_SNAPSHOTS_EXPIRED_COUNT = + defineCounter("table_snapshots_expired_count") + .withDescription("Count of snapshots expired") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Count of data files deleted during snapshot expiration */ + public static final MetricDefine TABLE_SNAPSHOTS_EXPIRED_DATA_FILES_DELETED = + defineCounter("table_snapshots_expired_data_files_deleted") + .withDescription("Count of data files deleted during snapshot expiration") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Duration of snapshot expiration operation (milliseconds) */ + public static final MetricDefine TABLE_SNAPSHOTS_EXPIRATION_DURATION = + defineGauge("table_snapshots_expiration_duration_millis") + .withDescription("Duration of snapshot expiration operation in milliseconds") + .withTags("catalog", "database", "table", "table_format") + .build(); + + // ========== Data Expiration Related MetricDefine (Iceberg) ========== + + /** Count of data files expired */ + public static final MetricDefine TABLE_DATA_EXPIRED_DATA_FILES_COUNT = + defineCounter("table_data_expired_data_files_count") + .withDescription("Count of data files expired") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Count of delete files expired */ + public static final MetricDefine TABLE_DATA_EXPIRED_DELETE_FILES_COUNT = + defineCounter("table_data_expired_delete_files_count") + .withDescription("Count of delete files expired") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Duration of data expiration operation (milliseconds) */ + public static final MetricDefine TABLE_DATA_EXPIRATION_DURATION = + defineGauge("table_data_expiration_duration_millis") + .withDescription("Duration of data expiration operation in milliseconds") + .withTags("catalog", "database", "table", "table_format") + .build(); + + // ========== Tag Creation Related MetricDefine (Iceberg) ========== + + /** Count of tags created */ + public static final MetricDefine TABLE_TAGS_CREATED_COUNT = + defineCounter("table_tags_created_count") + .withDescription("Count of tags created") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Duration of tag creation operation (milliseconds) */ + public static final MetricDefine TABLE_TAG_CREATION_DURATION = + defineGauge("table_tag_creation_duration_millis") + .withDescription("Duration of tag creation operation in milliseconds") + .withTags("catalog", "database", "table", "table_format") + .build(); + + // ========== Partition Expiration Related MetricDefine (Paimon) ========== + + /** Count of partitions expired */ + public static final MetricDefine TABLE_PARTITIONS_EXPIRED_COUNT = + defineCounter("table_partitions_expired_count") + .withDescription("Count of partitions expired") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Count of files expired during partition expiration */ + public static final MetricDefine TABLE_PARTITIONS_EXPIRED_FILES_COUNT = + defineCounter("table_partitions_expired_files_count") + .withDescription("Count of files expired during partition expiration") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Duration of partition expiration operation (milliseconds) */ + public static final MetricDefine TABLE_PARTITION_EXPIRATION_DURATION = + defineGauge("table_partition_expiration_duration_millis") + .withDescription("Duration of partition expiration operation in milliseconds") + .withTags("catalog", "database", "table", "table_format") + .build(); + + // ========== General Operation Status Related MetricDefine ========== + + /** Count of successful maintainer operations */ + public static final MetricDefine TABLE_MAINTAINER_OPERATION_SUCCESS_COUNT = + defineCounter("table_maintainer_operation_success_count") + .withDescription("Count of successful maintainer operations") + .withTags("catalog", "database", "table", "table_format", "operation_type") + .build(); + + /** Count of failed maintainer operations */ + public static final MetricDefine TABLE_MAINTAINER_OPERATION_FAILURE_COUNT = + defineCounter("table_maintainer_operation_failure_count") + .withDescription("Count of failed maintainer operations") + .withTags("catalog", "database", "table", "table_format", "operation_type") + .build(); + + /** Duration of maintainer operation (milliseconds) */ + public static final MetricDefine TABLE_MAINTAINER_OPERATION_DURATION = + defineGauge("table_maintainer_operation_duration_millis") + .withDescription("Duration of maintainer operation in milliseconds") + .withTags("catalog", "database", "table", "table_format", "operation_type") + .build(); + + // ========== Instance Fields ========== + + private final ServerTableIdentifier identifier; + private final String tableFormat; + private final List registeredMetricKeys = Lists.newArrayList(); + private MetricRegistry globalRegistry; + + // ========== Orphan Files Metrics ========== + private final Counter orphanContentFileCleaningCount = new Counter(); + private final Counter expectedOrphanContentFileCleaningCount = new Counter(); + private final Counter orphanMetadataFilesCount = new Counter(); + private final Counter orphanMetadataFilesExpectedCount = new Counter(); + private final LastOperationDurationGauge orphanFilesCleaningDuration = + new LastOperationDurationGauge(); + + // ========== Dangling Delete Files Metrics ========== + private final Counter danglingDeleteFilesCount = new Counter(); + private final LastOperationDurationGauge danglingDeleteFilesCleaningDuration = + new LastOperationDurationGauge(); + + // ========== Snapshot Expiration Metrics ========== + private final Counter snapshotsExpiredCount = new Counter(); + private final Counter snapshotsExpiredDataFilesDeleted = new Counter(); + private final LastOperationDurationGauge snapshotsExpirationDuration = + new LastOperationDurationGauge(); + + // ========== Data Expiration Metrics ========== + private final Counter dataExpiredDataFilesCount = new Counter(); + private final Counter dataExpiredDeleteFilesCount = new Counter(); + private final LastOperationDurationGauge dataExpirationDuration = + new LastOperationDurationGauge(); + + // ========== Tag Creation Metrics ========== + private final Counter tagsCreatedCount = new Counter(); + private final LastOperationDurationGauge tagCreationDuration = new LastOperationDurationGauge(); + + // ========== Partition Expiration Metrics ========== + private final Counter partitionsExpiredCount = new Counter(); + private final Counter partitionsExpiredFilesCount = new Counter(); + private final LastOperationDurationGauge partitionExpirationDuration = + new LastOperationDurationGauge(); + + // ========== Operation Status Metrics ========== + private final ConcurrentHashMap successCounters = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap failureCounters = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap durationGauges = + new ConcurrentHashMap<>(); + + /** + * Constructor + * + * @param identifier Table identifier (contains format information via getFormat()) + */ + public TableMaintainerMetrics(ServerTableIdentifier identifier) { + this.identifier = identifier; + this.tableFormat = identifier.getFormat().name().toLowerCase(); + // Initialize operation type counters and gauges + for (MaintainerOperationType type : MaintainerOperationType.values()) { + successCounters.put(type, new Counter()); + failureCounters.put(type, new Counter()); + durationGauges.put(type, new OperationDurationGauge()); + } + } + + /** + * Get the table identifier. + * + * @return ServerTableIdentifier + */ + public ServerTableIdentifier getIdentifier() { + return identifier; + } + + public void register(MetricRegistry registry) { + if (globalRegistry != null) { + return; + } + registerMetrics(registry); + globalRegistry = registry; + } + + public void unregister() { + if (globalRegistry != null) { + registeredMetricKeys.forEach(globalRegistry::unregister); + registeredMetricKeys.clear(); + globalRegistry = null; + } + } + + private void registerMetrics(MetricRegistry registry) { + // Build base tags (including table_format) + Map baseTags = + ImmutableMap.of( + "catalog", + identifier.getCatalog(), + "database", + identifier.getDatabase(), + "table", + identifier.getTableName(), + "table_format", + tableFormat); + + // Orphan files + registerMetricWithTags( + registry, + TABLE_ORPHAN_CONTENT_FILE_CLEANING_COUNT, + orphanContentFileCleaningCount, + baseTags); + registerMetricWithTags( + registry, + TABLE_EXPECTED_ORPHAN_CONTENT_FILE_CLEANING_COUNT, + expectedOrphanContentFileCleaningCount, + baseTags); + registerMetricWithTags( + registry, TABLE_ORPHAN_METADATA_FILES_CLEANED_COUNT, orphanMetadataFilesCount, baseTags); + registerMetricWithTags( + registry, + TABLE_ORPHAN_METADATA_FILES_CLEANED_EXPECTED_COUNT, + orphanMetadataFilesExpectedCount, + baseTags); + registerMetricWithTags( + registry, TABLE_ORPHAN_FILES_CLEANING_DURATION, orphanFilesCleaningDuration, baseTags); + + // Dangling delete files + registerMetricWithTags( + registry, TABLE_DANGLING_DELETE_FILES_CLEANED_COUNT, danglingDeleteFilesCount, baseTags); + registerMetricWithTags( + registry, + TABLE_DANGLING_DELETE_FILES_CLEANING_DURATION, + danglingDeleteFilesCleaningDuration, + baseTags); + + // Snapshot expiration + registerMetricWithTags( + registry, TABLE_SNAPSHOTS_EXPIRED_COUNT, snapshotsExpiredCount, baseTags); + registerMetricWithTags( + registry, + TABLE_SNAPSHOTS_EXPIRED_DATA_FILES_DELETED, + snapshotsExpiredDataFilesDeleted, + baseTags); + registerMetricWithTags( + registry, TABLE_SNAPSHOTS_EXPIRATION_DURATION, snapshotsExpirationDuration, baseTags); + + // Data expiration + registerMetricWithTags( + registry, TABLE_DATA_EXPIRED_DATA_FILES_COUNT, dataExpiredDataFilesCount, baseTags); + registerMetricWithTags( + registry, TABLE_DATA_EXPIRED_DELETE_FILES_COUNT, dataExpiredDeleteFilesCount, baseTags); + registerMetricWithTags( + registry, TABLE_DATA_EXPIRATION_DURATION, dataExpirationDuration, baseTags); + + // Tag creation + registerMetricWithTags(registry, TABLE_TAGS_CREATED_COUNT, tagsCreatedCount, baseTags); + registerMetricWithTags(registry, TABLE_TAG_CREATION_DURATION, tagCreationDuration, baseTags); + + // Partition expiration + registerMetricWithTags( + registry, TABLE_PARTITIONS_EXPIRED_COUNT, partitionsExpiredCount, baseTags); + registerMetricWithTags( + registry, TABLE_PARTITIONS_EXPIRED_FILES_COUNT, partitionsExpiredFilesCount, baseTags); + registerMetricWithTags( + registry, TABLE_PARTITION_EXPIRATION_DURATION, partitionExpirationDuration, baseTags); + + // Operation status (needs to include operation_type tag) + for (MaintainerOperationType type : MaintainerOperationType.values()) { + Map operationTags = + ImmutableMap.builder() + .putAll(baseTags) + .put("operation_type", type.getMetricName()) + .build(); + registerMetricWithTags( + registry, + TABLE_MAINTAINER_OPERATION_SUCCESS_COUNT, + successCounters.get(type), + operationTags); + registerMetricWithTags( + registry, + TABLE_MAINTAINER_OPERATION_FAILURE_COUNT, + failureCounters.get(type), + operationTags); + registerMetricWithTags( + registry, TABLE_MAINTAINER_OPERATION_DURATION, durationGauges.get(type), operationTags); + } + } + + /** + * Register metric with specified tags + * + * @param registry MetricRegistry + * @param define MetricDefine + * @param metric Metric instance + * @param tags Tags + */ + private void registerMetricWithTags( + MetricRegistry registry, MetricDefine define, Metric metric, Map tags) { + MetricKey key = registry.register(define, tags, metric); + registeredMetricKeys.add(key); + } + + // ========== MaintainerMetrics Interface Implementation ========== + + @Override + public void recordOrphanDataFilesCleaned(int expected, int cleaned) { + expectedOrphanContentFileCleaningCount.inc(expected); + orphanContentFileCleaningCount.inc(cleaned); + } + + @Override + public void recordOrphanMetadataFilesCleaned(int expected, int cleaned) { + orphanMetadataFilesExpectedCount.inc(expected); + orphanMetadataFilesCount.inc(cleaned); + } + + @Override + public void recordDanglingDeleteFilesCleaned(int cleaned) { + danglingDeleteFilesCount.inc(cleaned); + } + + @Override + public void recordSnapshotsExpired(int snapshotCount, int dataFilesDeleted, long durationMillis) { + snapshotsExpiredCount.inc(snapshotCount); + snapshotsExpiredDataFilesDeleted.inc(dataFilesDeleted); + snapshotsExpirationDuration.setValue(durationMillis); + } + + @Override + public void recordDataExpired(int dataFilesExpired, int deleteFilesExpired, long durationMillis) { + dataExpiredDataFilesCount.inc(dataFilesExpired); + dataExpiredDeleteFilesCount.inc(deleteFilesExpired); + dataExpirationDuration.setValue(durationMillis); + } + + @Override + public void recordTagsCreated(int tagsCreated, long durationMillis) { + tagsCreatedCount.inc(tagsCreated); + tagCreationDuration.setValue(durationMillis); + } + + @Override + public void recordPartitionsExpired( + int partitionsExpired, int filesExpired, long durationMillis) { + partitionsExpiredCount.inc(partitionsExpired); + partitionsExpiredFilesCount.inc(filesExpired); + partitionExpirationDuration.setValue(durationMillis); + } + + @Override + public void recordOperationStart(MaintainerOperationType operationType) { + durationGauges.get(operationType).recordStart(); + } + + @Override + public void recordOperationSuccess(MaintainerOperationType operationType, long durationMillis) { + successCounters.get(operationType).inc(); + durationGauges.get(operationType).setValue(durationMillis); + } + + @Override + public void recordOperationFailure( + MaintainerOperationType operationType, long durationMillis, Throwable throwable) { + failureCounters.get(operationType).inc(); + durationGauges.get(operationType).setValue(durationMillis); + } + + // ========== Internal Helper Classes ========== + + /** Gauge implementation for recording last operation duration */ + private static class LastOperationDurationGauge implements Gauge { + private volatile long value = 0L; + + public void setValue(long value) { + this.value = value; + } + + @Override + public Long getValue() { + return value; + } + } + + /** Gauge implementation for recording operation duration (supports recording start time) */ + private static class OperationDurationGauge implements Gauge { + private volatile long value = 0L; + private volatile long startTime = 0L; + + public void recordStart() { + this.startTime = System.currentTimeMillis(); + } + + public void setValue(long value) { + this.value = value; + } + + @Override + public Long getValue() { + return value; + } + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOrphanFilesCleaningMetrics.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOrphanFilesCleaningMetrics.java deleted file mode 100644 index 481eb175b3..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOrphanFilesCleaningMetrics.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.table; - -import static org.apache.amoro.metrics.MetricDefine.defineCounter; - -import org.apache.amoro.ServerTableIdentifier; -import org.apache.amoro.maintainer.MaintainerMetrics; -import org.apache.amoro.metrics.Counter; -import org.apache.amoro.metrics.MetricDefine; -import org.apache.amoro.metrics.MetricRegistry; - -/** Table Orphan Files Cleaning metrics. */ -public class TableOrphanFilesCleaningMetrics extends AbstractTableMetrics - implements MaintainerMetrics { - private final Counter orphanDataFilesCount = new Counter(); - private final Counter expectedOrphanDataFilesCount = new Counter(); - - private final Counter orphanMetadataFilesCount = new Counter(); - private final Counter expectedOrphanMetadataFilesCount = new Counter(); - - public TableOrphanFilesCleaningMetrics(ServerTableIdentifier identifier) { - super(identifier); - } - - public static final MetricDefine TABLE_ORPHAN_CONTENT_FILE_CLEANING_COUNT = - defineCounter("table_orphan_content_file_cleaning_count") - .withDescription("Count of orphan content files cleaned in the table since ams started") - .withTags("catalog", "database", "table") - .build(); - - public static final MetricDefine TABLE_ORPHAN_METADATA_FILE_CLEANING_COUNT = - defineCounter("table_orphan_metadata_file_cleaning_count") - .withDescription("Count of orphan metadata files cleaned in the table since ams started") - .withTags("catalog", "database", "table") - .build(); - - public static final MetricDefine TABLE_EXPECTED_ORPHAN_CONTENT_FILE_CLEANING_COUNT = - defineCounter("table_expected_orphan_content_file_cleaning_count") - .withDescription( - "Expected count of orphan content files cleaned in the table since ams started") - .withTags("catalog", "database", "table") - .build(); - - public static final MetricDefine TABLE_EXPECTED_ORPHAN_METADATA_FILE_CLEANING_COUNT = - defineCounter("table_expected_orphan_metadata_file_cleaning_count") - .withDescription( - "Expected count of orphan metadata files cleaned in the table since ams started") - .withTags("catalog", "database", "table") - .build(); - - @Override - public void registerMetrics(MetricRegistry registry) { - if (globalRegistry == null) { - registerMetric(registry, TABLE_ORPHAN_CONTENT_FILE_CLEANING_COUNT, orphanDataFilesCount); - registerMetric(registry, TABLE_ORPHAN_METADATA_FILE_CLEANING_COUNT, orphanMetadataFilesCount); - registerMetric( - registry, - TABLE_EXPECTED_ORPHAN_CONTENT_FILE_CLEANING_COUNT, - expectedOrphanDataFilesCount); - registerMetric( - registry, - TABLE_EXPECTED_ORPHAN_METADATA_FILE_CLEANING_COUNT, - expectedOrphanMetadataFilesCount); - globalRegistry = registry; - } - } - - public void completeOrphanDataFiles(int expected, int cleaned) { - expectedOrphanDataFilesCount.inc(expected); - orphanDataFilesCount.inc(cleaned); - } - - public void completeOrphanMetadataFiles(int expected, int cleaned) { - expectedOrphanMetadataFilesCount.inc(expected); - orphanMetadataFilesCount.inc(cleaned); - } - - @Override - public void recordOrphanDataFilesCleaned(int expected, int cleaned) { - completeOrphanDataFiles(expected, cleaned); - } - - @Override - public void recordOrphanMetadataFilesCleaned(int expected, int cleaned) { - completeOrphanMetadataFiles(expected, cleaned); - } -} diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java index 1326bacdd7..f25d955f67 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java @@ -22,14 +22,13 @@ import static org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer.FLINK_JOB_ID; import org.apache.amoro.BasicTableTestHelper; -import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; import org.apache.amoro.TableTestHelper; import org.apache.amoro.catalog.BasicCatalogTestHelper; import org.apache.amoro.catalog.CatalogTestHelper; import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer; +import org.apache.amoro.maintainer.MaintainerMetrics; import org.apache.amoro.server.scheduler.inline.ExecutorTestBase; -import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics; import org.apache.amoro.table.TableIdentifier; import org.apache.amoro.table.TableProperties; import org.apache.amoro.table.UnkeyedTable; @@ -110,23 +109,17 @@ public void orphanDataFileClean() throws IOException { Assert.assertTrue(getMixedTable().io().exists(changeOrphanFilePath)); } TableIdentifier tableIdentifier = getMixedTable().id(); - TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics = - new TableOrphanFilesCleaningMetrics( - ServerTableIdentifier.of( - tableIdentifier.getCatalog(), - tableIdentifier.getDatabase(), - tableIdentifier.getTableName(), - getTestFormat())); + MaintainerMetrics metrics = MaintainerMetrics.NOOP; MixedTableMaintainer maintainer = new MixedTableMaintainer(getMixedTable(), TestTableMaintainerContext.of(getMixedTable())); maintainer.cleanContentFiles( System.currentTimeMillis() - TableProperties.MIN_ORPHAN_FILE_EXISTING_TIME_DEFAULT * 60 * 1000, - orphanFilesCleaningMetrics); + metrics); maintainer.cleanMetadata( System.currentTimeMillis() - TableProperties.MIN_ORPHAN_FILE_EXISTING_TIME_DEFAULT * 60 * 1000, - orphanFilesCleaningMetrics); + metrics); Assert.assertTrue(getMixedTable().io().exists(baseOrphanFileDir)); Assert.assertTrue(getMixedTable().io().exists(baseOrphanFilePath)); @@ -135,8 +128,8 @@ public void orphanDataFileClean() throws IOException { Assert.assertTrue(getMixedTable().io().exists(changeOrphanFilePath)); } - maintainer.cleanContentFiles(System.currentTimeMillis(), orphanFilesCleaningMetrics); - maintainer.cleanMetadata(System.currentTimeMillis(), orphanFilesCleaningMetrics); + maintainer.cleanContentFiles(System.currentTimeMillis(), metrics); + maintainer.cleanMetadata(System.currentTimeMillis(), metrics); Assert.assertFalse(getMixedTable().io().exists(baseOrphanFileDir)); Assert.assertFalse(getMixedTable().io().exists(baseOrphanFilePath)); @@ -204,15 +197,8 @@ public void orphanMetadataFileClean() throws IOException { MixedTableMaintainer maintainer = new MixedTableMaintainer(getMixedTable(), TestTableMaintainerContext.of(getMixedTable())); - TableIdentifier tableIdentifier = getMixedTable().id(); - TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics = - new TableOrphanFilesCleaningMetrics( - ServerTableIdentifier.of( - tableIdentifier.getCatalog(), - tableIdentifier.getDatabase(), - tableIdentifier.getTableName(), - getTestFormat())); - maintainer.cleanMetadata(System.currentTimeMillis(), orphanFilesCleaningMetrics); + MaintainerMetrics metrics = MaintainerMetrics.NOOP; + maintainer.cleanMetadata(System.currentTimeMillis(), metrics); Assert.assertFalse(getMixedTable().io().exists(baseOrphanFilePath)); if (isKeyedTable()) { @@ -298,16 +284,8 @@ public void notDeleteFlinkTemporaryFile() throws IOException { MixedTableMaintainer tableMaintainer = new MixedTableMaintainer(getMixedTable(), TestTableMaintainerContext.of(getMixedTable())); - TableIdentifier tableIdentifier = getMixedTable().id(); - TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics = - new TableOrphanFilesCleaningMetrics( - ServerTableIdentifier.of( - tableIdentifier.getCatalog(), - tableIdentifier.getDatabase(), - tableIdentifier.getTableName(), - getTestFormat())); - - tableMaintainer.cleanMetadata(System.currentTimeMillis(), orphanFilesCleaningMetrics); + MaintainerMetrics metrics = MaintainerMetrics.NOOP; + tableMaintainer.cleanMetadata(System.currentTimeMillis(), metrics); Assert.assertFalse(getMixedTable().io().exists(baseOrphanFilePath)); if (isKeyedTable()) { // files whose file name starts with flink.job-id should not be deleted @@ -334,22 +312,15 @@ public void notDeleteStatisticsFile() { StatisticsFile file3 = commitStatisticsFile(unkeyedTable, unkeyedTable.location() + "/data/puffin/test3.puffin"); - TableIdentifier tableIdentifier = getMixedTable().id(); - TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics = - new TableOrphanFilesCleaningMetrics( - ServerTableIdentifier.of( - tableIdentifier.getCatalog(), - tableIdentifier.getDatabase(), - tableIdentifier.getTableName(), - getTestFormat())); + MaintainerMetrics metrics = MaintainerMetrics.NOOP; Assert.assertTrue(unkeyedTable.io().exists(file1.path())); Assert.assertTrue(unkeyedTable.io().exists(file2.path())); Assert.assertTrue(unkeyedTable.io().exists(file3.path())); new MixedTableMaintainer(getMixedTable(), TestTableMaintainerContext.of(getMixedTable())) - .cleanContentFiles(System.currentTimeMillis() + 1, orphanFilesCleaningMetrics); + .cleanContentFiles(System.currentTimeMillis() + 1, metrics); new MixedTableMaintainer(getMixedTable(), TestTableMaintainerContext.of(getMixedTable())) - .cleanMetadata(System.currentTimeMillis() + 1, orphanFilesCleaningMetrics); + .cleanMetadata(System.currentTimeMillis() + 1, metrics); Assert.assertTrue(unkeyedTable.io().exists(file1.path())); Assert.assertTrue(unkeyedTable.io().exists(file2.path())); Assert.assertTrue(unkeyedTable.io().exists(file3.path())); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java index 729999525c..50c238318f 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java @@ -20,7 +20,6 @@ import static org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer.DATA_FOLDER_NAME; -import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; import org.apache.amoro.TableTestHelper; import org.apache.amoro.catalog.CatalogTestHelper; @@ -29,8 +28,7 @@ import org.apache.amoro.hive.catalog.HiveCatalogTestHelper; import org.apache.amoro.hive.catalog.HiveTableTestHelper; import org.apache.amoro.hive.table.SupportHive; -import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics; -import org.apache.amoro.table.TableIdentifier; +import org.apache.amoro.maintainer.MaintainerMetrics; import org.apache.iceberg.io.OutputFile; import org.junit.Assert; import org.junit.ClassRule; @@ -87,15 +85,8 @@ public void hiveLocationOrphanDataFileClean() throws IOException { MixedTableMaintainer maintainer = new MixedTableMaintainer(getMixedTable(), TestTableMaintainerContext.of(getMixedTable())); - TableIdentifier tableIdentifier = getMixedTable().id(); - TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics = - new TableOrphanFilesCleaningMetrics( - ServerTableIdentifier.of( - tableIdentifier.getCatalog(), - tableIdentifier.getDatabase(), - tableIdentifier.getTableName(), - getTestFormat())); - maintainer.cleanContentFiles(System.currentTimeMillis(), orphanFilesCleaningMetrics); + MaintainerMetrics metrics = MaintainerMetrics.NOOP; + maintainer.cleanContentFiles(System.currentTimeMillis(), metrics); Assert.assertTrue(getMixedTable().io().exists(hiveOrphanFilePath)); } } diff --git a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java index 31ea74f1fe..913548da78 100644 --- a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java +++ b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java @@ -19,6 +19,7 @@ package org.apache.amoro; import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.maintainer.MaintainerMetrics; import org.apache.amoro.metrics.MetricRegistry; import org.apache.amoro.process.ProcessFactory; import org.apache.amoro.process.TableProcessStore; @@ -79,6 +80,15 @@ default TableFormat getFormat() { return getTableIdentifier().getFormat(); } + /** + * Get the maintainer metrics collector. + * + * @return maintainer metrics collector + */ + default MaintainerMetrics getMaintainerMetrics() { + return MaintainerMetrics.NOOP; + } + /** Dispose the table runtime. */ default void dispose() {} } diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerMetrics.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerMetrics.java index 420c61e3b0..0ad0fab728 100644 --- a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerMetrics.java +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerMetrics.java @@ -40,6 +40,73 @@ public interface MaintainerMetrics { */ void recordOrphanMetadataFilesCleaned(int expected, int cleaned); + /** + * Record dangling delete files cleaning result. + * + * @param cleaned number of files cleaned + */ + void recordDanglingDeleteFilesCleaned(int cleaned); + + /** + * Record snapshot expiration operation result. + * + * @param snapshotCount number of snapshots expired + * @param dataFilesDeleted number of data files deleted + * @param durationMillis operation duration in milliseconds + */ + void recordSnapshotsExpired(int snapshotCount, int dataFilesDeleted, long durationMillis); + + /** + * Record data expiration operation result. + * + * @param dataFilesExpired number of data files expired + * @param deleteFilesExpired number of delete files expired + * @param durationMillis operation duration in milliseconds + */ + void recordDataExpired(int dataFilesExpired, int deleteFilesExpired, long durationMillis); + + /** + * Record tag creation operation result. + * + * @param tagsCreated number of tags created + * @param durationMillis operation duration in milliseconds + */ + void recordTagsCreated(int tagsCreated, long durationMillis); + + /** + * Record partition expiration operation result. + * + * @param partitionsExpired number of partitions expired + * @param filesExpired number of files expired + * @param durationMillis operation duration in milliseconds + */ + void recordPartitionsExpired(int partitionsExpired, int filesExpired, long durationMillis); + + /** + * Record operation start. + * + * @param operationType operation type + */ + void recordOperationStart(MaintainerOperationType operationType); + + /** + * Record operation success completion. + * + * @param operationType operation type + * @param durationMillis operation duration in milliseconds + */ + void recordOperationSuccess(MaintainerOperationType operationType, long durationMillis); + + /** + * Record operation failure. + * + * @param operationType operation type + * @param durationMillis operation duration in milliseconds + * @param throwable exception information + */ + void recordOperationFailure( + MaintainerOperationType operationType, long durationMillis, Throwable throwable); + /** No-op implementation that does nothing. */ MaintainerMetrics NOOP = new MaintainerMetrics() { @@ -48,5 +115,34 @@ public void recordOrphanDataFilesCleaned(int expected, int cleaned) {} @Override public void recordOrphanMetadataFilesCleaned(int expected, int cleaned) {} + + @Override + public void recordDanglingDeleteFilesCleaned(int cleaned) {} + + @Override + public void recordSnapshotsExpired( + int snapshotCount, int dataFilesDeleted, long durationMillis) {} + + @Override + public void recordDataExpired( + int dataFilesExpired, int deleteFilesExpired, long durationMillis) {} + + @Override + public void recordTagsCreated(int tagsCreated, long durationMillis) {} + + @Override + public void recordPartitionsExpired( + int partitionsExpired, int filesExpired, long durationMillis) {} + + @Override + public void recordOperationStart(MaintainerOperationType operationType) {} + + @Override + public void recordOperationSuccess( + MaintainerOperationType operationType, long durationMillis) {} + + @Override + public void recordOperationFailure( + MaintainerOperationType operationType, long durationMillis, Throwable throwable) {} }; } diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOperationExecutor.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOperationExecutor.java new file mode 100644 index 0000000000..88d3a0459d --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOperationExecutor.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +import java.util.function.Supplier; + +/** + * Executor for running maintainer operations with consistent metrics recording. + * + *

This executor ensures that all maintainer operations record metrics in a consistent way: + * + *

+ * + *

Usage example: + * + *

{@code
+ * MaintainerOperationExecutor executor = new MaintainerOperationExecutor(metrics);
+ * executor.execute(
+ *     MaintainerOperationType.ORPHAN_FILES_CLEANING,
+ *     () -> {
+ *       // Operation logic here
+ *       cleanOrphanFiles();
+ *     }
+ * );
+ * }
+ */ +public class MaintainerOperationExecutor { + + private final MaintainerMetrics metrics; + + /** + * Creates a new operation executor with the given metrics collector. + * + * @param metrics the metrics collector (can be null, will use NOOP in that case) + */ + public MaintainerOperationExecutor(MaintainerMetrics metrics) { + this.metrics = metrics != null ? metrics : MaintainerMetrics.NOOP; + } + + /** + * Executes a maintainer operation with metrics recording. + * + *

This method will: + * + *

    + *
  1. Record the operation start via {@link MaintainerMetrics#recordOperationStart} + *
  2. Execute the provided operation + *
  3. On success: record operation success via {@link MaintainerMetrics#recordOperationSuccess} + *
  4. On failure: record operation failure via {@link MaintainerMetrics#recordOperationFailure} + * and rethrow the exception + *
+ * + * @param operationType the type of operation being executed + * @param operation the operation to execute + */ + public void execute(MaintainerOperationType operationType, Runnable operation) { + long startTime = System.currentTimeMillis(); + metrics.recordOperationStart(operationType); + + try { + operation.run(); + long duration = System.currentTimeMillis() - startTime; + metrics.recordOperationSuccess(operationType, duration); + } catch (Throwable t) { + long duration = System.currentTimeMillis() - startTime; + metrics.recordOperationFailure(operationType, duration, t); + throw t; + } + } + + /** + * Executes a maintainer operation with metrics recording and return result. + * + *

This method will: + * + *

    + *
  1. Record the operation start via {@link MaintainerMetrics#recordOperationStart} + *
  2. Execute the provided operation + *
  3. On success: record operation success via {@link MaintainerMetrics#recordOperationSuccess} + *
  4. On failure: record operation failure via {@link MaintainerMetrics#recordOperationFailure} + * and rethrow the exception + *
+ * + * @param operationType the type of operation being executed + * @param operation the operation to execute + * @param the result type + * @return the operation result + */ + public T executeAndReturn(MaintainerOperationType operationType, Supplier operation) { + long startTime = System.currentTimeMillis(); + metrics.recordOperationStart(operationType); + + try { + T result = operation.get(); + long duration = System.currentTimeMillis() - startTime; + metrics.recordOperationSuccess(operationType, duration); + return result; + } catch (Throwable t) { + long duration = System.currentTimeMillis() - startTime; + metrics.recordOperationFailure(operationType, duration, t); + throw t; + } + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOperationType.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOperationType.java new file mode 100644 index 0000000000..41baafe3a0 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOperationType.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +/** + * Enumeration of maintenance operation types + * + *

Corresponds to various maintenance operations in TableMaintainer, used for metrics recording + * and classification + */ +public enum MaintainerOperationType { + /** Orphan files cleaning (including data files and metadata files) */ + ORPHAN_FILES_CLEANING("orphan_files_cleaning"), + + /** Dangling delete files cleaning (Iceberg specific) */ + DANGLING_DELETE_FILES_CLEANING("dangling_delete_files_cleaning"), + + /** Snapshot expiration */ + SNAPSHOT_EXPIRATION("snapshot_expiration"), + + /** Data expiration */ + DATA_EXPIRATION("data_expiration"), + + /** Tag creation */ + TAG_CREATION("tag_creation"), + + /** Partition expiration (Paimon specific) */ + PARTITION_EXPIRATION("partition_expiration"); + + private final String metricName; + + MaintainerOperationType(String metricName) { + this.metricName = metricName; + } + + /** + * Get metric name + * + * @return Metric name + */ + public String getMetricName() { + return metricName; + } +} diff --git a/amoro-common/src/test/java/org/apache/amoro/maintainer/TestMaintainerOperationExecutor.java b/amoro-common/src/test/java/org/apache/amoro/maintainer/TestMaintainerOperationExecutor.java new file mode 100644 index 0000000000..84307ab020 --- /dev/null +++ b/amoro-common/src/test/java/org/apache/amoro/maintainer/TestMaintainerOperationExecutor.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class TestMaintainerOperationExecutor { + + @Mock private MaintainerMetrics mockMetrics; + + private MaintainerOperationExecutor executor; + + @BeforeEach + public void setUp() { + executor = new MaintainerOperationExecutor(mockMetrics); + } + + @Test + public void testSuccessfulOperation() { + // Execute operation + executor.execute( + MaintainerOperationType.ORPHAN_FILES_CLEANING, + () -> { + // Operation logic here + }); + + // Verify metrics recorded + InOrder inOrder = inOrder(mockMetrics); + inOrder.verify(mockMetrics).recordOperationStart(MaintainerOperationType.ORPHAN_FILES_CLEANING); + inOrder + .verify(mockMetrics) + .recordOperationSuccess(eq(MaintainerOperationType.ORPHAN_FILES_CLEANING), anyLong()); + verify(mockMetrics, never()) + .recordOperationFailure( + eq(MaintainerOperationType.ORPHAN_FILES_CLEANING), anyLong(), any()); + } + + @Test + public void testFailedOperation() { + RuntimeException exception = new RuntimeException("Test error"); + + // Execute operation and expect exception + assertThrows( + RuntimeException.class, + () -> + executor.execute( + MaintainerOperationType.SNAPSHOT_EXPIRATION, + () -> { + throw exception; + })); + + // Verify metrics recorded + InOrder inOrder = inOrder(mockMetrics); + inOrder.verify(mockMetrics).recordOperationStart(MaintainerOperationType.SNAPSHOT_EXPIRATION); + inOrder + .verify(mockMetrics) + .recordOperationFailure( + eq(MaintainerOperationType.SNAPSHOT_EXPIRATION), anyLong(), eq(exception)); + verify(mockMetrics, never()) + .recordOperationSuccess(eq(MaintainerOperationType.SNAPSHOT_EXPIRATION), anyLong()); + } + + @Test + public void testOperationWithResult() { + Integer expected = 42; + + // Execute operation with result + Integer result = + executor.executeAndReturn( + MaintainerOperationType.DATA_EXPIRATION, + () -> { + return expected; + }); + + // Verify result + assertEquals(expected, result); + + // Verify metrics recorded + InOrder inOrder = inOrder(mockMetrics); + inOrder.verify(mockMetrics).recordOperationStart(MaintainerOperationType.DATA_EXPIRATION); + inOrder + .verify(mockMetrics) + .recordOperationSuccess(eq(MaintainerOperationType.DATA_EXPIRATION), anyLong()); + } + + @Test + public void testOperationWithResultFailure() { + IllegalStateException exception = new IllegalStateException("Test state error"); + + // Execute operation and expect exception + assertThrows( + IllegalStateException.class, + () -> + executor.executeAndReturn( + MaintainerOperationType.TAG_CREATION, + () -> { + throw exception; + })); + + // Verify metrics recorded + verify(mockMetrics).recordOperationStart(MaintainerOperationType.TAG_CREATION); + verify(mockMetrics) + .recordOperationFailure(eq(MaintainerOperationType.TAG_CREATION), anyLong(), eq(exception)); + } + + @Test + public void testNullMetricsUsesNoop() { + // Create executor with null metrics + MaintainerOperationExecutor noopExecutor = new MaintainerOperationExecutor(null); + + // Should not throw exception + assertDoesNotThrow( + () -> + noopExecutor.execute( + MaintainerOperationType.DANGLING_DELETE_FILES_CLEANING, + () -> { + // Operation logic here + })); + + // Execute with result + Integer result = + assertDoesNotThrow( + () -> + noopExecutor.executeAndReturn( + MaintainerOperationType.PARTITION_EXPIRATION, () -> 123)); + assertEquals(123, result); + } + + @Test + public void testMultipleOperations() { + // Execute multiple operations + executor.execute( + MaintainerOperationType.ORPHAN_FILES_CLEANING, + () -> { + // First operation + }); + + executor.execute( + MaintainerOperationType.SNAPSHOT_EXPIRATION, + () -> { + // Second operation + }); + + // Verify both operations were recorded + verify(mockMetrics, times(1)) + .recordOperationStart(MaintainerOperationType.ORPHAN_FILES_CLEANING); + verify(mockMetrics, times(1)) + .recordOperationSuccess(eq(MaintainerOperationType.ORPHAN_FILES_CLEANING), anyLong()); + verify(mockMetrics, times(1)).recordOperationStart(MaintainerOperationType.SNAPSHOT_EXPIRATION); + verify(mockMetrics, times(1)) + .recordOperationSuccess(eq(MaintainerOperationType.SNAPSHOT_EXPIRATION), anyLong()); + } + + @Test + public void testDurationIsRecorded() { + // Add a small delay to ensure duration > 0 + long startTime = System.currentTimeMillis(); + executor.execute( + MaintainerOperationType.DATA_EXPIRATION, + () -> { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + long endTime = System.currentTimeMillis(); + + // Verify duration is recorded and is reasonable + InOrder inOrder = inOrder(mockMetrics); + inOrder.verify(mockMetrics).recordOperationStart(MaintainerOperationType.DATA_EXPIRATION); + inOrder + .verify(mockMetrics) + .recordOperationSuccess(eq(MaintainerOperationType.DATA_EXPIRATION), anyLong()); + } + + @Test + public void testAllOperationTypes() { + // Test all operation types + MaintainerOperationType[] operationTypes = MaintainerOperationType.values(); + + for (MaintainerOperationType operationType : operationTypes) { + executor.execute( + operationType, + () -> { + // Operation logic + }); + } + + // Verify all operation types were recorded + for (MaintainerOperationType operationType : operationTypes) { + verify(mockMetrics, times(1)).recordOperationStart(operationType); + verify(mockMetrics, times(1)).recordOperationSuccess(eq(operationType), anyLong()); + } + } +} diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java index 3d1d4bfb1c..3e6eeb12bd 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java @@ -31,6 +31,8 @@ import org.apache.amoro.io.PathInfo; import org.apache.amoro.io.SupportsFileSystemOperations; import org.apache.amoro.maintainer.MaintainerMetrics; +import org.apache.amoro.maintainer.MaintainerOperationExecutor; +import org.apache.amoro.maintainer.MaintainerOperationType; import org.apache.amoro.maintainer.OptimizingInfo; import org.apache.amoro.maintainer.TableMaintainer; import org.apache.amoro.maintainer.TableMaintainerContext; @@ -139,38 +141,63 @@ public void cleanOrphanFiles() { return; } - long keepTime = tableConfiguration.getOrphanExistingMinutes() * 60 * 1000; + MaintainerOperationExecutor executor = new MaintainerOperationExecutor(metrics); - cleanContentFiles(System.currentTimeMillis() - keepTime, metrics); + executor.execute( + MaintainerOperationType.ORPHAN_FILES_CLEANING, + () -> { + long keepTime = tableConfiguration.getOrphanExistingMinutes() * 60 * 1000; + + cleanContentFiles(System.currentTimeMillis() - keepTime, metrics); - // refresh - table.refresh(); + // refresh + table.refresh(); - // clear metadata files - cleanMetadata(System.currentTimeMillis() - keepTime, metrics); + // clear metadata files + cleanMetadata(System.currentTimeMillis() - keepTime, metrics); + }); } @Override public void cleanDanglingDeleteFiles() { TableConfiguration tableConfiguration = context.getTableConfiguration(); + MaintainerMetrics metrics = context.getMetrics(); + if (!tableConfiguration.isDeleteDanglingDeleteFilesEnabled()) { return; } - Snapshot currentSnapshot = table.currentSnapshot(); - if (currentSnapshot == null) { - return; - } - Optional totalDeleteFiles = - Optional.ofNullable(currentSnapshot.summary().get(SnapshotSummary.TOTAL_DELETE_FILES_PROP)); - if (totalDeleteFiles.isPresent() && Long.parseLong(totalDeleteFiles.get()) > 0) { - // clear dangling delete files - doCleanDanglingDeleteFiles(); - } else { - LOG.debug( - "There are no delete files here, so there is no need to clean dangling delete file for table {}", - table.name()); - } + MaintainerOperationExecutor executor = new MaintainerOperationExecutor(metrics); + + executor.execute( + MaintainerOperationType.DANGLING_DELETE_FILES_CLEANING, + () -> { + Snapshot currentSnapshot = table.currentSnapshot(); + if (currentSnapshot == null) { + return; + } + Optional totalDeleteFiles = + Optional.ofNullable( + currentSnapshot.summary().get(SnapshotSummary.TOTAL_DELETE_FILES_PROP)); + if (totalDeleteFiles.isPresent() && Long.parseLong(totalDeleteFiles.get()) > 0) { + // clear dangling delete files + LOG.info("Starting cleaning dangling delete files for table {}", table.name()); + int danglingDeleteFilesCnt = clearInternalTableDanglingDeleteFiles(); + runWithCondition( + danglingDeleteFilesCnt > 0, + () -> { + LOG.info( + "Deleted {} dangling delete files for table {}", + danglingDeleteFilesCnt, + table.name()); + metrics.recordDanglingDeleteFilesCleaned(danglingDeleteFilesCnt); + }); + } else { + LOG.debug( + "There are no delete files here, so there is no need to clean dangling delete file for table {}", + table.name()); + } + }); } @Override @@ -178,68 +205,95 @@ public void expireSnapshots() { if (!expireSnapshotEnabled()) { return; } - expireSnapshots( - mustOlderThan(System.currentTimeMillis()), - context.getTableConfiguration().getSnapshotMinCount()); - } - - public boolean expireSnapshotEnabled() { - TableConfiguration tableConfiguration = context.getTableConfiguration(); - return tableConfiguration.isExpireSnapshotEnabled(); - } + MaintainerMetrics metrics = context.getMetrics(); + long mustOlderThan = mustOlderThan(System.currentTimeMillis()); + int minCount = context.getTableConfiguration().getSnapshotMinCount(); - @VisibleForTesting - public void expireSnapshots(long mustOlderThan, int minCount) { - expireSnapshots(mustOlderThan, minCount, expireSnapshotNeedToExcludeFiles()); + MaintainerOperationExecutor executor = new MaintainerOperationExecutor(metrics); + executor.execute( + MaintainerOperationType.SNAPSHOT_EXPIRATION, + () -> expireSnapshotsInternal(mustOlderThan, minCount, metrics)); } - private void expireSnapshots(long olderThan, int minCount, Set exclude) { + private void expireSnapshotsInternal( + long mustOlderThan, int minCount, MaintainerMetrics metrics) { + long startTime = System.currentTimeMillis(); + Set exclude = expireSnapshotNeedToExcludeFiles(); LOG.debug( "Starting snapshots expiration for table {}, expiring snapshots older than {} and retain last {} snapshots, excluding {}", table.name(), - olderThan, + mustOlderThan, minCount, exclude); RollingFileCleaner expiredFileCleaner = new RollingFileCleaner(fileIO(), exclude); table .expireSnapshots() .retainLast(Math.max(minCount, 1)) - .expireOlderThan(olderThan) + .expireOlderThan(mustOlderThan) .deleteWith(expiredFileCleaner::addFile) .cleanExpiredFiles( true) /* enable clean only for collecting the expired files, will delete them later */ .commit(); - int collectedFiles = expiredFileCleaner.fileCount(); + int snapshotCount = expiredFileCleaner.fileCount(); + int dataFilesDeleted = expiredFileCleaner.cleanedFileCount(); expiredFileCleaner.clear(); - if (collectedFiles > 0) { + + long duration = System.currentTimeMillis() - startTime; + + if (snapshotCount > 0) { LOG.info( - "Expired {}/{} files for table {} order than {}", - collectedFiles, - expiredFileCleaner.cleanedFileCount(), + "Expired {}/{} files for table {} older than {}", + snapshotCount, + dataFilesDeleted, table.name(), - DateTimeUtil.formatTimestampMillis(olderThan)); + DateTimeUtil.formatTimestampMillis(mustOlderThan)); + metrics.recordSnapshotsExpired(snapshotCount, dataFilesDeleted, duration); } else { LOG.debug( - "No expired files found for table {} order than {}", + "No expired files found for table {} older than {}", table.name(), - DateTimeUtil.formatTimestampMillis(olderThan)); + DateTimeUtil.formatTimestampMillis(mustOlderThan)); } } + public boolean expireSnapshotEnabled() { + TableConfiguration tableConfiguration = context.getTableConfiguration(); + return tableConfiguration.isExpireSnapshotEnabled(); + } + + @VisibleForTesting + public void expireSnapshots(long mustOlderThan, int minCount) { + MaintainerMetrics metrics = context.getMetrics(); + expireSnapshotsInternal(mustOlderThan, minCount, metrics); + } + + private void expireSnapshots(long olderThan, int minCount, Set exclude) { + // This method is kept for backward compatibility with potential subclasses + // Redirect to the new method with metrics + MaintainerMetrics metrics = context.getMetrics(); + expireSnapshotsInternal(olderThan, minCount, metrics); + } + @Override public void expireData() { DataExpirationConfig expirationConfig = context.getTableConfiguration().getExpiringDataConfig(); - try { - Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField()); - if (!isValidDataExpirationField(expirationConfig, field, table.name())) { - return; - } + MaintainerMetrics metrics = context.getMetrics(); - expireDataFrom(expirationConfig, expireBaseOnRule(expirationConfig, field)); - } catch (Throwable t) { - LOG.error("Unexpected purge error for table {} ", tableIdentifier, t); - } + MaintainerOperationExecutor executor = new MaintainerOperationExecutor(metrics); + executor.execute( + MaintainerOperationType.DATA_EXPIRATION, + () -> { + Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField()); + if (!isValidDataExpirationField(expirationConfig, field, table.name())) { + return; + } + + Instant expireInstant = expireBaseOnRule(expirationConfig, field); + if (!expireInstant.equals(Instant.MIN)) { + doExpireData(expirationConfig, expireInstant, metrics); + } + }); } public Instant expireBaseOnRule(DataExpirationConfig expirationConfig, Types.NestedField field) { @@ -273,26 +327,191 @@ public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instan if (instant.equals(Instant.MIN)) { return; } + MaintainerMetrics metrics = context.getMetrics(); + MaintainerOperationExecutor executor = new MaintainerOperationExecutor(metrics); + executor.execute( + MaintainerOperationType.DATA_EXPIRATION, + () -> doExpireData(expirationConfig, instant, metrics)); + } + private void doExpireData( + DataExpirationConfig expirationConfig, Instant instant, MaintainerMetrics metrics) { + // Add defensive check + if (instant.equals(Instant.MIN)) { + return; + } + long startTime = System.currentTimeMillis(); long expireTimestamp = instant.minusMillis(expirationConfig.getRetentionTime()).toEpochMilli(); + Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField()); LOG.info( "Expiring data older than {} in table {} ", - Instant.ofEpochMilli(expireTimestamp) - .atZone( - getDefaultZoneId(table.schema().findField(expirationConfig.getExpirationField()))) - .toLocalDateTime(), + Instant.ofEpochMilli(expireTimestamp).atZone(getDefaultZoneId(field)).toLocalDateTime(), table.name()); Expression dataFilter = getDataExpression(table.schema(), expirationConfig, expireTimestamp); ExpireFiles expiredFiles = expiredFileScan(expirationConfig, dataFilter, expireTimestamp); - expireFiles(expiredFiles, expireTimestamp); + + int dataFilesCount = expiredFiles.dataFiles.size(); + int deleteFilesCount = expiredFiles.deleteFiles.size(); + + if (dataFilesCount > 0 || deleteFilesCount > 0) { + expireFiles(expiredFiles, expireTimestamp); + } + + long duration = System.currentTimeMillis() - startTime; + + if (dataFilesCount > 0 || deleteFilesCount > 0) { + LOG.info( + "Data expiration completed for table {}, {} data files and {} delete files expired, duration: {}ms", + table.name(), + dataFilesCount, + deleteFilesCount, + duration); + metrics.recordDataExpired(dataFilesCount, deleteFilesCount, duration); + } } @Override public void autoCreateTags() { TagConfiguration tagConfiguration = context.getTableConfiguration().getTagConfiguration(); - new AutoCreateIcebergTagAction(table, tagConfiguration, LocalDateTime.now()).execute(); + MaintainerMetrics metrics = context.getMetrics(); + + MaintainerOperationExecutor executor = new MaintainerOperationExecutor(metrics); + executor.execute( + MaintainerOperationType.TAG_CREATION, + () -> { + long startTime = System.currentTimeMillis(); + int tagsCreated = autoCreateTagsInternal(tagConfiguration); + long duration = System.currentTimeMillis() - startTime; + if (tagsCreated > 0) { + metrics.recordTagsCreated(tagsCreated, duration); + } + }); + } + + private int autoCreateTagsInternal(TagConfiguration tagConfiguration) { + LocalDateTime checkTime = LocalDateTime.now(); + + if (!tagConfiguration.isAutoCreateTag()) { + return 0; + } + LOG.debug("Start checking the automatic creation of tags for {}", table.name()); + if (tagExists(tagConfiguration, checkTime)) { + LOG.debug("Found the expected tag on {}, skip", table.name()); + return 0; + } + boolean success = createTagWithConfig(tagConfiguration, checkTime); + if (success) { + LOG.info("Created a tag successfully on {}", table.name()); + return 1; + } else { + LOG.info("Skipped tag creation on {}", table.name()); + return 0; + } + } + + /** + * Check if tag exists for the given tag configuration. + * + * @param tagConfiguration tag configuration + * @param checkTime check time + * @return true if tag exists, false otherwise + */ + private boolean tagExists(TagConfiguration tagConfiguration, LocalDateTime checkTime) { + LocalDateTime tagTime = + tagConfiguration + .getTriggerPeriod() + .getTagTime(checkTime, tagConfiguration.getTriggerOffsetMinutes()); + LocalDateTime triggerTime = tagTime.plusMinutes(tagConfiguration.getTriggerOffsetMinutes()); + String tagName = + tagConfiguration + .getTriggerPeriod() + .generateTagName(tagTime, tagConfiguration.getTagFormat()); + + String tag = + table.refs().entrySet().stream() + .filter(entry -> entry.getValue().isTag()) + .map(Map.Entry::getKey) + .filter(tagName::equals) + .findFirst() + .orElse(null); + return tag != null; + } + + /** + * Create tag using the provided configuration. + * + * @param tagConfiguration the tag configuration + * @param checkTime the check time + * @return true if tag was created, false otherwise + */ + private boolean createTagWithConfig(TagConfiguration tagConfiguration, LocalDateTime checkTime) { + LocalDateTime tagTime = + tagConfiguration + .getTriggerPeriod() + .getTagTime(checkTime, tagConfiguration.getTriggerOffsetMinutes()); + String tagName = + tagConfiguration + .getTriggerPeriod() + .generateTagName(tagTime, tagConfiguration.getTagFormat()); + + // Create the tag + long tagTriggerTimestampMillis = + tagTime.atZone(java.time.ZoneId.systemDefault()).toInstant().toEpochMilli(); + org.apache.iceberg.Snapshot snapshot = findSnapshotForTag(table, tagTriggerTimestampMillis); + if (snapshot == null) { + LOG.info("Found no snapshot at {} for {}", tagTriggerTimestampMillis, table.name()); + return false; + } + + if (exceedMaxDelayForTag(snapshot, tagConfiguration, tagTriggerTimestampMillis)) { + LOG.info( + "{}'s snapshot {} at {} exceeds max delay {}, and the expected trigger time is {}", + table.name(), + snapshot.snapshotId(), + snapshot.timestampMillis(), + tagConfiguration.getMaxDelayMinutes(), + tagTriggerTimestampMillis); + return false; + } + + org.apache.iceberg.ManageSnapshots tag = + table.manageSnapshots().createTag(tagName, snapshot.snapshotId()); + if (tagConfiguration.getTagMaxAgeMs() > 0) { + tag.setMaxRefAgeMs(tagName, tagConfiguration.getTagMaxAgeMs()); + } + tag.commit(); + LOG.info( + "Created a tag {} for {} on snapshot {} at {}", + tagName, + table.name(), + snapshot.snapshotId(), + snapshot.timestampMillis()); + return true; + } + + private static org.apache.iceberg.Snapshot findSnapshotForTag( + org.apache.iceberg.Table table, long tagTriggerTime) { + Iterable snapshots = table.snapshots(); + for (org.apache.iceberg.Snapshot snapshot : snapshots) { + long waterMark = snapshot.timestampMillis(); + if (waterMark >= tagTriggerTime) { + return snapshot; + } + } + return null; + } + + private boolean exceedMaxDelayForTag( + org.apache.iceberg.Snapshot snapshot, + TagConfiguration tagConfig, + long tagTriggerTimestampMillis) { + if (tagConfig.getMaxDelayMinutes() <= 0) { + return false; + } + long delay = snapshot.timestampMillis() - tagTriggerTimestampMillis; + return delay > tagConfig.getMaxDelayMinutes() * 60_000L; } public void cleanContentFiles(long lastTime, MaintainerMetrics metrics) { diff --git a/amoro-format-iceberg/src/test/java/org/apache/amoro/formats/iceberg/maintainer/TestIcebergTableMaintainerMetrics.java b/amoro-format-iceberg/src/test/java/org/apache/amoro/formats/iceberg/maintainer/TestIcebergTableMaintainerMetrics.java new file mode 100644 index 0000000000..1a965f1d20 --- /dev/null +++ b/amoro-format-iceberg/src/test/java/org/apache/amoro/formats/iceberg/maintainer/TestIcebergTableMaintainerMetrics.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.formats.iceberg.maintainer; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.maintainer.MaintainerMetrics; +import org.apache.amoro.maintainer.MaintainerOperationType; +import org.apache.amoro.maintainer.OptimizingInfo; +import org.apache.amoro.maintainer.TableMaintainerContext; +import org.apache.amoro.table.TableIdentifier; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +/** + * Unit tests for IcebergTableMaintainer metrics recording. + * + *

These tests verify that IcebergTableMaintainer correctly records metrics through the + * MaintainerMetrics interface for all maintainer operations. + */ +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public class TestIcebergTableMaintainerMetrics { + + @Mock private TableMaintainerContext mockContext; + + @Mock private MaintainerMetrics mockMetrics; + + @Mock private TableConfiguration mockTableConfiguration; + + @Mock private OptimizingInfo mockOptimizingInfo; + + @Mock private org.apache.amoro.config.TagConfiguration mockTagConfiguration; + + @Mock private org.apache.amoro.config.DataExpirationConfig mockDataExpirationConfig; + + private IcebergTableMaintainer maintainer; + + @BeforeEach + public void setUp() { + TableIdentifier tableIdentifier = TableIdentifier.of("test_catalog", "test_db", "test_table"); + + when(mockContext.getMetrics()).thenReturn(mockMetrics); + when(mockContext.getTableConfiguration()).thenReturn(mockTableConfiguration); + when(mockContext.getOptimizingInfo()).thenReturn(mockOptimizingInfo); + when(mockOptimizingInfo.isProcessing()).thenReturn(false); + + // Feature disabled by default to avoid complex setup + when(mockTableConfiguration.isCleanOrphanEnabled()).thenReturn(false); + when(mockTableConfiguration.isDeleteDanglingDeleteFilesEnabled()).thenReturn(false); + when(mockTableConfiguration.isExpireSnapshotEnabled()).thenReturn(false); + + // Setup Tag and DataExpiration configs to avoid NPE + when(mockTableConfiguration.getTagConfiguration()).thenReturn(mockTagConfiguration); + when(mockTagConfiguration.isAutoCreateTag()).thenReturn(false); + when(mockTableConfiguration.getExpiringDataConfig()).thenReturn(mockDataExpirationConfig); + // Data expiration disabled by default + when(mockDataExpirationConfig.isEnabled()).thenReturn(false); + // Use a valid field name ("ts" matches the mock table schema) to avoid issues + when(mockDataExpirationConfig.getExpirationField()).thenReturn("ts"); + + // Create a minimal Table mock to avoid complex setup + org.apache.iceberg.Table table = createMinimalTableMock(); + + maintainer = new IcebergTableMaintainer(table, tableIdentifier, mockContext); + } + + @Test + public void testExpireSnapshotsDisabledDoesNotRecordMetrics() { + // Feature disabled by default in setUp() + when(mockTableConfiguration.isExpireSnapshotEnabled()).thenReturn(false); + + // Execute + maintainer.expireSnapshots(); + + // Verify no metrics are recorded when feature is disabled + verify(mockMetrics, never()).recordOperationStart(any(MaintainerOperationType.class)); + } + + @Test + public void testAutoCreateTagsDisabledDoesNotRecordDetailedMetrics() { + // Setup - feature disabled + when(mockTagConfiguration.isAutoCreateTag()).thenReturn(false); + + // Execute + maintainer.autoCreateTags(); + + // Verify operation-level metrics are still recorded even when no tags are created + InOrder inOrder = inOrder(mockMetrics); + inOrder.verify(mockMetrics).recordOperationStart(MaintainerOperationType.TAG_CREATION); + inOrder + .verify(mockMetrics) + .recordOperationSuccess(eq(MaintainerOperationType.TAG_CREATION), anyLong()); + } + + @Test + public void testExpireDataInvalidFieldDoesNotRecordDetailedMetrics() { + // Setup - empty expiration field causes IllegalArgumentException + when(mockDataExpirationConfig.isEnabled()).thenReturn(true); + when(mockDataExpirationConfig.getRetentionTime()).thenReturn(1L); + when(mockDataExpirationConfig.getExpirationField()).thenReturn(""); + when(mockDataExpirationConfig.getBaseOnRule()) + .thenReturn(org.apache.amoro.config.DataExpirationConfig.BaseOnRule.CURRENT_TIME); + + // Execute - this will throw exception for empty field name + assertThrows(IllegalArgumentException.class, () -> maintainer.expireData()); + + // Verify operation-level metrics are still recorded even when field is invalid + // Exception is caught and logged, metrics still record failure + verify(mockMetrics).recordOperationStart(MaintainerOperationType.DATA_EXPIRATION); + // When exception occurs, recordOperationFailure is called + verify(mockMetrics) + .recordOperationFailure(eq(MaintainerOperationType.DATA_EXPIRATION), anyLong(), any()); + } + + @Test + public void testCleanOrphanFilesDisabledDoesNotRecordMetrics() { + // Feature disabled by default in setUp() + when(mockTableConfiguration.isCleanOrphanEnabled()).thenReturn(false); + + // Execute + maintainer.cleanOrphanFiles(); + + // Verify no metrics are recorded when feature is disabled + verify(mockMetrics, never()).recordOperationStart(any(MaintainerOperationType.class)); + } + + @Test + public void testCleanDanglingDeleteFilesDisabledDoesNotRecordMetrics() { + // Feature disabled by default in setUp() + when(mockTableConfiguration.isDeleteDanglingDeleteFilesEnabled()).thenReturn(false); + + // Execute + maintainer.cleanDanglingDeleteFiles(); + + // Verify no dangling delete metrics are recorded when feature is disabled + verify(mockMetrics, never()).recordDanglingDeleteFilesCleaned(anyInt()); + } + + @Test + public void testAllMaintainerOperationsExist() { + // Verify that IcebergTableMaintainer implements methods for all relevant operation types + + // IcebergTableMaintainer should cover 5 out of 6 operation types: + // ORPHAN_FILES_CLEANING - cleanOrphanFiles() + // DANGLING_DELETE_FILES_CLEANING - cleanDanglingDeleteFiles() + // SNAPSHOT_EXPIRATION - expireSnapshots() + // DATA_EXPIRATION - expireData() + // TAG_CREATION - autoCreateTags() + // PARTITION_EXPIRATION - not in IcebergTableMaintainer (Paimon-specific) + + // Verify the methods exist and can be called + assertDoesNotThrow(() -> maintainer.cleanOrphanFiles()); + assertDoesNotThrow(() -> maintainer.cleanDanglingDeleteFiles()); + assertDoesNotThrow(() -> maintainer.expireSnapshots()); + assertDoesNotThrow(() -> maintainer.expireData()); + assertDoesNotThrow(() -> maintainer.autoCreateTags()); + } + + @Test + public void testMaintainerMetricsInterfaceHasAllOperationTypes() { + // Verify MaintainerOperationType enum has all expected operation types + MaintainerOperationType[] operationTypes = MaintainerOperationType.values(); + + // Should have 6 operation types + assertEquals(6, operationTypes.length); + + // Verify specific operation types exist + assertDoesNotThrow(() -> MaintainerOperationType.valueOf("ORPHAN_FILES_CLEANING")); + assertDoesNotThrow(() -> MaintainerOperationType.valueOf("DANGLING_DELETE_FILES_CLEANING")); + assertDoesNotThrow(() -> MaintainerOperationType.valueOf("SNAPSHOT_EXPIRATION")); + assertDoesNotThrow(() -> MaintainerOperationType.valueOf("DATA_EXPIRATION")); + assertDoesNotThrow(() -> MaintainerOperationType.valueOf("TAG_CREATION")); + assertDoesNotThrow(() -> MaintainerOperationType.valueOf("PARTITION_EXPIRATION")); + } + + /** + * Creates a minimal Table mock that avoids complex setup. The mock is configured to handle basic + * method calls without throwing exceptions. + */ + private org.apache.iceberg.Table createMinimalTableMock() { + org.apache.iceberg.Table table = org.mockito.Mockito.mock(org.apache.iceberg.Table.class); + + // Configure minimal behavior to avoid NullPointerExceptions + when(table.name()).thenReturn("test_catalog.test_db.test_table"); + when(table.schema()) + .thenReturn( + new org.apache.iceberg.Schema( + org.apache.iceberg.types.Types.NestedField.optional( + 1, "ts", org.apache.iceberg.types.Types.TimestampType.withoutZone()))); + when(table.currentSnapshot()).thenReturn(null); + + return table; + } +}