diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java index ef74193258..46e56a800a 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java @@ -24,17 +24,24 @@ import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableRuntime; import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.process.ProcessStatus; import org.apache.amoro.server.optimizing.OptimizingStatus; +import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.server.persistence.mapper.TableProcessMapper; +import org.apache.amoro.server.process.TableProcessMeta; import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableService; import org.apache.amoro.server.table.cleanup.CleanupOperation; +import org.apache.amoro.server.utils.SnowflakeIdGenerator; +import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; @@ -48,6 +55,12 @@ public abstract class PeriodicTableScheduler extends RuntimeHandlerChain { protected final Logger logger = LoggerFactory.getLogger(getClass()); private static final long START_DELAY = 10 * 1000L; + private static final String CLEANUP_EXECUTION_ENGINE = "AMORO"; + private static final String CLEANUP_PROCESS_STAGE = "CLEANUP"; + private static final String EXTERNAL_PROCESS_IDENTIFIER = ""; + + private final SnowflakeIdGenerator idGenerator = new SnowflakeIdGenerator(); + private final PersistencyHelper persistencyHelper = new PersistencyHelper(); protected final Set scheduledTables = Collections.synchronizedSet(new HashSet<>()); @@ -123,14 +136,22 @@ private void scheduleTableExecution(TableRuntime tableRuntime, long delay) { } private void executeTask(TableRuntime tableRuntime) { + TableProcessMeta cleanProcessMeta = null; + CleanupOperation cleanupOperation = null; + Throwable executionError = null; + try { if (isExecutable(tableRuntime)) { + cleanupOperation = getCleanupOperation(); + cleanProcessMeta = createCleanupProcessInfo(tableRuntime, cleanupOperation); + execute(tableRuntime); - // Different tables take different amounts of time to execute the end of execute(), - // so you need to perform the update operation separately for each table. - persistUpdatingCleanupTime(tableRuntime); } + } catch (Throwable t) { + executionError = t; } finally { + persistCleanupResult(tableRuntime, cleanupOperation, cleanProcessMeta, executionError); + scheduledTables.remove(tableRuntime.getTableIdentifier()); scheduleIfNecessary(tableRuntime, getNextExecutingTime(tableRuntime)); } @@ -154,25 +175,125 @@ protected boolean shouldExecute(Long lastCleanupEndTime) { return true; } - private void persistUpdatingCleanupTime(TableRuntime tableRuntime) { - CleanupOperation cleanupOperation = getCleanupOperation(); + @VisibleForTesting + public TableProcessMeta createCleanupProcessInfo( + TableRuntime tableRuntime, CleanupOperation cleanupOperation) { + if (shouldSkipOperation(tableRuntime, cleanupOperation)) { + return null; + } + + TableProcessMeta cleanProcessMeta = buildProcessMeta(tableRuntime, cleanupOperation); + persistencyHelper.beginAndPersistCleanupProcess(cleanProcessMeta); + + logger.debug( + "Successfully persist cleanup process [processId={}, tableId={}, processType={}]", + cleanProcessMeta.getProcessId(), + cleanProcessMeta.getTableId(), + cleanProcessMeta.getProcessType()); + + return cleanProcessMeta; + } + + @VisibleForTesting + public void persistCleanupResult( + TableRuntime tableRuntime, + CleanupOperation cleanupOperation, + TableProcessMeta cleanProcessMeta, + Throwable executionError) { + + if (cleanProcessMeta == null) { return; } - try { - long currentTime = System.currentTimeMillis(); - ((DefaultTableRuntime) tableRuntime).updateLastCleanTime(cleanupOperation, currentTime); + if (executionError != null) { + cleanProcessMeta.setStatus(ProcessStatus.FAILED); + String message = executionError.getMessage(); + if (message == null) { + message = executionError.getClass().getName(); + } - logger.debug( - "Update lastCleanTime for table {} with cleanup operation {}", - tableRuntime.getTableIdentifier().getTableName(), - cleanupOperation); - } catch (Exception e) { - logger.error( - "Failed to update lastCleanTime for table {}", - tableRuntime.getTableIdentifier().getTableName(), - e); + cleanProcessMeta.setFailMessage(message); + } else { + cleanProcessMeta.setStatus(ProcessStatus.SUCCESS); + } + + long endTime = System.currentTimeMillis(); + persistencyHelper.persistAndSetCompleted( + tableRuntime, cleanupOperation, cleanProcessMeta, endTime); + + logger.debug( + "Successfully updated lastCleanTime and cleanupProcess for table {} with cleanup operation {}", + tableRuntime.getTableIdentifier().getTableName(), + cleanupOperation); + } + + private TableProcessMeta buildProcessMeta( + TableRuntime tableRuntime, CleanupOperation cleanupOperation) { + TableProcessMeta cleanProcessMeta = new TableProcessMeta(); + cleanProcessMeta.setTableId(tableRuntime.getTableIdentifier().getId()); + cleanProcessMeta.setProcessId(idGenerator.generateId()); + cleanProcessMeta.setExternalProcessIdentifier(EXTERNAL_PROCESS_IDENTIFIER); + cleanProcessMeta.setStatus(ProcessStatus.RUNNING); + cleanProcessMeta.setProcessType(cleanupOperation.name()); + cleanProcessMeta.setProcessStage(CLEANUP_PROCESS_STAGE); + cleanProcessMeta.setExecutionEngine(CLEANUP_EXECUTION_ENGINE); + cleanProcessMeta.setRetryNumber(0); + cleanProcessMeta.setCreateTime(System.currentTimeMillis()); + cleanProcessMeta.setProcessParameters(new HashMap<>()); + cleanProcessMeta.setSummary(new HashMap<>()); + + return cleanProcessMeta; + } + + private static class PersistencyHelper extends PersistentBase { + + public PersistencyHelper() {} + + private void beginAndPersistCleanupProcess(TableProcessMeta meta) { + doAsTransaction( + () -> + doAs( + TableProcessMapper.class, + mapper -> + mapper.insertProcess( + meta.getTableId(), + meta.getProcessId(), + meta.getExternalProcessIdentifier(), + meta.getStatus(), + meta.getProcessType(), + meta.getProcessStage(), + meta.getExecutionEngine(), + meta.getRetryNumber(), + meta.getCreateTime(), + meta.getProcessParameters(), + meta.getSummary()))); + } + + private void persistAndSetCompleted( + TableRuntime tableRuntime, + CleanupOperation cleanupOperation, + TableProcessMeta meta, + long endTime) { + + doAsTransaction( + () -> + doAs( + TableProcessMapper.class, + mapper -> + mapper.updateProcess( + meta.getTableId(), + meta.getProcessId(), + meta.getExternalProcessIdentifier(), + meta.getStatus(), + meta.getProcessStage(), + meta.getRetryNumber(), + endTime, + meta.getFailMessage(), + meta.getProcessParameters(), + meta.getSummary())), + () -> + ((DefaultTableRuntime) tableRuntime).updateLastCleanTime(cleanupOperation, endTime)); } } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java index 50c0e9d19d..93d994236c 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java @@ -19,6 +19,7 @@ package org.apache.amoro.server.scheduler.inline; import org.apache.amoro.TableRuntime; +import org.apache.amoro.server.process.TableProcessMeta; import org.apache.amoro.server.scheduler.PeriodicTableScheduler; import org.apache.amoro.server.table.TableService; import org.apache.amoro.server.table.cleanup.CleanupOperation; @@ -88,4 +89,17 @@ public boolean shouldExecuteTaskForTest( TableRuntime tableRuntime, CleanupOperation cleanupOperation) { return shouldExecuteTask(tableRuntime, cleanupOperation); } + + public TableProcessMeta createCleanupProcessInfoForTest( + TableRuntime tableRuntime, CleanupOperation operation) { + return createCleanupProcessInfo(tableRuntime, operation); + } + + public void persistCleanupResultForTest( + TableRuntime tableRuntime, + CleanupOperation operation, + TableProcessMeta meta, + Throwable error) { + persistCleanupResult(tableRuntime, operation, meta, error); + } } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java index c401c88d8c..7ce22ddeda 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java @@ -22,10 +22,13 @@ import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.process.ProcessStatus; import org.apache.amoro.server.persistence.PersistentBase; import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; +import org.apache.amoro.server.persistence.mapper.TableProcessMapper; import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper; +import org.apache.amoro.server.process.TableProcessMeta; import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.table.DefaultTableRuntimeStore; import org.apache.amoro.server.table.TableRuntimeHandler; @@ -262,4 +265,67 @@ public void testShouldExecuteTaskWithNoneOperation() { boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime, CleanupOperation.NONE); Assert.assertTrue("Should always execute with NONE operation", shouldExecute); } + + /** + * Validates that process info is correctly saved to table_process with proper status transitions. + */ + @Test + public void testCleanupProcessPersistence() { + long baseTableId = 200L; + List operations = + Arrays.asList( + CleanupOperation.ORPHAN_FILES_CLEANING, + CleanupOperation.DANGLING_DELETE_FILES_CLEANING, + CleanupOperation.DATA_EXPIRING, + CleanupOperation.SNAPSHOTS_EXPIRING); + + for (int i = 0; i < operations.size(); i++) { + long tableId = baseTableId + i; + CleanupOperation operation = operations.get(i); + prepareTestEnvironment(Collections.singletonList(tableId)); + + PeriodicTableSchedulerTestBase executor = createTestExecutor(operation); + ServerTableIdentifier identifier = createTableIdentifier(tableId); + DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier); + + // Scenario 1: Create process - verify initial persisted state + TableProcessMeta processMeta = + executor.createCleanupProcessInfoForTest(tableRuntime, operation); + TableProcessMeta persistedRunning = getProcessMeta(processMeta.getProcessId()); + + Assert.assertEquals(ProcessStatus.RUNNING, persistedRunning.getStatus()); + + // Scenario 2: Success completion - verify status update and timestamps + executor.persistCleanupResultForTest(tableRuntime, operation, processMeta.copy(), null); + TableProcessMeta persistedSuccess = getProcessMeta(processMeta.getProcessId()); + Assert.assertEquals(ProcessStatus.SUCCESS, persistedSuccess.getStatus()); + Assert.assertTrue(persistedSuccess.getCreateTime() < persistedSuccess.getFinishTime()); + + // Scenario 3: Failure handling - verify error persistence + processMeta = executor.createCleanupProcessInfoForTest(tableRuntime, operation); + RuntimeException testError = new RuntimeException("Cleanup failed for " + operation); + executor.persistCleanupResultForTest(tableRuntime, operation, processMeta.copy(), testError); + + TableProcessMeta persistedFailure = getProcessMeta(processMeta.getProcessId()); + Assert.assertEquals(ProcessStatus.FAILED, persistedFailure.getStatus()); + Assert.assertEquals(testError.getMessage(), persistedFailure.getFailMessage()); + + cleanUpTableProcess(tableId); + } + } + + private TableProcessMeta getProcessMeta(long processId) { + return getAs(TableProcessMapper.class, m -> m.getProcessMeta(processId)); + } + + /** Clean up table_process records for a specific table */ + private void cleanUpTableProcess(long tableId) { + doAs( + TableProcessMapper.class, + mapper -> { + mapper + .listProcessMeta(tableId, null, null) + .forEach(p -> mapper.deleteBefore(tableId, p.getProcessId())); + }); + } }