Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,24 @@
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.cleanup.CleanupOperation;
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
Expand All @@ -48,6 +55,12 @@ public abstract class PeriodicTableScheduler extends RuntimeHandlerChain {
protected final Logger logger = LoggerFactory.getLogger(getClass());

private static final long START_DELAY = 10 * 1000L;
private static final String CLEANUP_EXECUTION_ENGINE = "AMORO";
private static final String CLEANUP_PROCESS_STAGE = "CLEANUP";
private static final String EXTERNAL_PROCESS_IDENTIFIER = "";

private final SnowflakeIdGenerator idGenerator = new SnowflakeIdGenerator();
private final PersistencyHelper persistencyHelper = new PersistencyHelper();

protected final Set<ServerTableIdentifier> scheduledTables =
Collections.synchronizedSet(new HashSet<>());
Expand Down Expand Up @@ -123,14 +136,22 @@ private void scheduleTableExecution(TableRuntime tableRuntime, long delay) {
}

private void executeTask(TableRuntime tableRuntime) {
TableProcessMeta cleanProcessMeta = null;
CleanupOperation cleanupOperation = null;
Throwable executionError = null;

try {
if (isExecutable(tableRuntime)) {
cleanupOperation = getCleanupOperation();
cleanProcessMeta = createCleanupProcessInfo(tableRuntime, cleanupOperation);

execute(tableRuntime);
// Different tables take different amounts of time to execute the end of execute(),
// so you need to perform the update operation separately for each table.
persistUpdatingCleanupTime(tableRuntime);
}
} catch (Throwable t) {
executionError = t;
} finally {
persistCleanupResult(tableRuntime, cleanupOperation, cleanProcessMeta, executionError);

scheduledTables.remove(tableRuntime.getTableIdentifier());
scheduleIfNecessary(tableRuntime, getNextExecutingTime(tableRuntime));
}
Expand All @@ -154,25 +175,125 @@ protected boolean shouldExecute(Long lastCleanupEndTime) {
return true;
}

private void persistUpdatingCleanupTime(TableRuntime tableRuntime) {
CleanupOperation cleanupOperation = getCleanupOperation();
@VisibleForTesting
public TableProcessMeta createCleanupProcessInfo(
TableRuntime tableRuntime, CleanupOperation cleanupOperation) {

if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
return null;
}

TableProcessMeta cleanProcessMeta = buildProcessMeta(tableRuntime, cleanupOperation);
persistencyHelper.beginAndPersistCleanupProcess(cleanProcessMeta);

logger.debug(
"Successfully persist cleanup process [processId={}, tableId={}, processType={}]",
cleanProcessMeta.getProcessId(),
cleanProcessMeta.getTableId(),
cleanProcessMeta.getProcessType());

return cleanProcessMeta;
}

@VisibleForTesting
public void persistCleanupResult(
TableRuntime tableRuntime,
CleanupOperation cleanupOperation,
TableProcessMeta cleanProcessMeta,
Throwable executionError) {

if (cleanProcessMeta == null) {
return;
}

try {
long currentTime = System.currentTimeMillis();
((DefaultTableRuntime) tableRuntime).updateLastCleanTime(cleanupOperation, currentTime);
if (executionError != null) {
cleanProcessMeta.setStatus(ProcessStatus.FAILED);
String message = executionError.getMessage();
if (message == null) {
message = executionError.getClass().getName();
}

logger.debug(
"Update lastCleanTime for table {} with cleanup operation {}",
tableRuntime.getTableIdentifier().getTableName(),
cleanupOperation);
} catch (Exception e) {
logger.error(
"Failed to update lastCleanTime for table {}",
tableRuntime.getTableIdentifier().getTableName(),
e);
cleanProcessMeta.setFailMessage(message);
} else {
cleanProcessMeta.setStatus(ProcessStatus.SUCCESS);
}

long endTime = System.currentTimeMillis();
persistencyHelper.persistAndSetCompleted(
tableRuntime, cleanupOperation, cleanProcessMeta, endTime);

logger.debug(
"Successfully updated lastCleanTime and cleanupProcess for table {} with cleanup operation {}",
tableRuntime.getTableIdentifier().getTableName(),
cleanupOperation);
}

private TableProcessMeta buildProcessMeta(
TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
TableProcessMeta cleanProcessMeta = new TableProcessMeta();
cleanProcessMeta.setTableId(tableRuntime.getTableIdentifier().getId());
cleanProcessMeta.setProcessId(idGenerator.generateId());
cleanProcessMeta.setExternalProcessIdentifier(EXTERNAL_PROCESS_IDENTIFIER);
cleanProcessMeta.setStatus(ProcessStatus.RUNNING);
cleanProcessMeta.setProcessType(cleanupOperation.name());
cleanProcessMeta.setProcessStage(CLEANUP_PROCESS_STAGE);
cleanProcessMeta.setExecutionEngine(CLEANUP_EXECUTION_ENGINE);
cleanProcessMeta.setRetryNumber(0);
cleanProcessMeta.setCreateTime(System.currentTimeMillis());
cleanProcessMeta.setProcessParameters(new HashMap<>());
cleanProcessMeta.setSummary(new HashMap<>());

return cleanProcessMeta;
}

private static class PersistencyHelper extends PersistentBase {

public PersistencyHelper() {}

private void beginAndPersistCleanupProcess(TableProcessMeta meta) {
doAsTransaction(
() ->
doAs(
TableProcessMapper.class,
mapper ->
mapper.insertProcess(
meta.getTableId(),
meta.getProcessId(),
meta.getExternalProcessIdentifier(),
meta.getStatus(),
meta.getProcessType(),
meta.getProcessStage(),
meta.getExecutionEngine(),
meta.getRetryNumber(),
meta.getCreateTime(),
meta.getProcessParameters(),
meta.getSummary())));
}

private void persistAndSetCompleted(
TableRuntime tableRuntime,
CleanupOperation cleanupOperation,
TableProcessMeta meta,
long endTime) {

doAsTransaction(
() ->
doAs(
TableProcessMapper.class,
mapper ->
mapper.updateProcess(
meta.getTableId(),
meta.getProcessId(),
meta.getExternalProcessIdentifier(),
meta.getStatus(),
meta.getProcessStage(),
meta.getRetryNumber(),
endTime,
meta.getFailMessage(),
meta.getProcessParameters(),
meta.getSummary())),
() ->
((DefaultTableRuntime) tableRuntime).updateLastCleanTime(cleanupOperation, endTime));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.amoro.server.scheduler.inline;

import org.apache.amoro.TableRuntime;
import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.cleanup.CleanupOperation;
Expand Down Expand Up @@ -88,4 +89,17 @@ public boolean shouldExecuteTaskForTest(
TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
return shouldExecuteTask(tableRuntime, cleanupOperation);
}

public TableProcessMeta createCleanupProcessInfoForTest(
TableRuntime tableRuntime, CleanupOperation operation) {
return createCleanupProcessInfo(tableRuntime, operation);
}

public void persistCleanupResultForTest(
TableRuntime tableRuntime,
CleanupOperation operation,
TableProcessMeta meta,
Throwable error) {
persistCleanupResult(tableRuntime, operation, meta, error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper;
import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.DefaultTableRuntimeStore;
import org.apache.amoro.server.table.TableRuntimeHandler;
Expand Down Expand Up @@ -262,4 +265,67 @@ public void testShouldExecuteTaskWithNoneOperation() {
boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime, CleanupOperation.NONE);
Assert.assertTrue("Should always execute with NONE operation", shouldExecute);
}

/**
* Validates that process info is correctly saved to table_process with proper status transitions.
*/
@Test
public void testCleanupProcessPersistence() {
long baseTableId = 200L;
List<CleanupOperation> operations =
Arrays.asList(
CleanupOperation.ORPHAN_FILES_CLEANING,
CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
CleanupOperation.DATA_EXPIRING,
CleanupOperation.SNAPSHOTS_EXPIRING);

for (int i = 0; i < operations.size(); i++) {
long tableId = baseTableId + i;
CleanupOperation operation = operations.get(i);
prepareTestEnvironment(Collections.singletonList(tableId));

PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
ServerTableIdentifier identifier = createTableIdentifier(tableId);
DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier);

// Scenario 1: Create process - verify initial persisted state
TableProcessMeta processMeta =
executor.createCleanupProcessInfoForTest(tableRuntime, operation);
TableProcessMeta persistedRunning = getProcessMeta(processMeta.getProcessId());

Assert.assertEquals(ProcessStatus.RUNNING, persistedRunning.getStatus());

// Scenario 2: Success completion - verify status update and timestamps
executor.persistCleanupResultForTest(tableRuntime, operation, processMeta.copy(), null);
TableProcessMeta persistedSuccess = getProcessMeta(processMeta.getProcessId());
Assert.assertEquals(ProcessStatus.SUCCESS, persistedSuccess.getStatus());
Assert.assertTrue(persistedSuccess.getCreateTime() < persistedSuccess.getFinishTime());

// Scenario 3: Failure handling - verify error persistence
processMeta = executor.createCleanupProcessInfoForTest(tableRuntime, operation);
RuntimeException testError = new RuntimeException("Cleanup failed for " + operation);
executor.persistCleanupResultForTest(tableRuntime, operation, processMeta.copy(), testError);

TableProcessMeta persistedFailure = getProcessMeta(processMeta.getProcessId());
Assert.assertEquals(ProcessStatus.FAILED, persistedFailure.getStatus());
Assert.assertEquals(testError.getMessage(), persistedFailure.getFailMessage());

cleanUpTableProcess(tableId);
}
}

private TableProcessMeta getProcessMeta(long processId) {
return getAs(TableProcessMapper.class, m -> m.getProcessMeta(processId));
}

/** Clean up table_process records for a specific table */
private void cleanUpTableProcess(long tableId) {
doAs(
TableProcessMapper.class,
mapper -> {
mapper
.listProcessMeta(tableId, null, null)
.forEach(p -> mapper.deleteBefore(tableId, p.getProcessId()));
});
}
}
Loading