diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerInput.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerInput.java new file mode 100644 index 0000000000..c642c89012 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerInput.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects; + +import java.util.HashMap; +import java.util.Map; + +/** Base implementation of MaintainerInput following BaseOptimizingInput pattern. */ +public abstract class BaseMaintainerInput implements MaintainerInput { + + private static final long serialVersionUID = 1L; + + private final Map options = new HashMap<>(); + + @Override + public void option(String name, String value) { + options.put(name, value); + } + + @Override + public void options(Map options) { + this.options.putAll(options); + } + + @Override + public Map getOptions() { + return options; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("operationType", getOperationType()) + .add("tableIdentifier", getTableIdentifier()) + .add("tableFormat", getTableFormat()) + .add("options", options) + .toString(); + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerOutput.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerOutput.java new file mode 100644 index 0000000000..3d5658bf74 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerOutput.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** Base implementation of MaintainerOutput. */ +public class BaseMaintainerOutput implements MaintainerOutput { + + private static final long serialVersionUID = 1L; + + private final Map summary; + private final boolean success; + private final String errorMessage; + + /** Create a successful maintainer output. */ + public BaseMaintainerOutput() { + this(true, null); + } + + /** + * Create a maintainer output with specified status. + * + * @param success whether the operation succeeded + * @param errorMessage error message if failed, null otherwise + */ + public BaseMaintainerOutput(boolean success, String errorMessage) { + this.summary = new HashMap<>(); + this.success = success; + this.errorMessage = errorMessage; + } + + @Override + public Map summary() { + return Collections.unmodifiableMap(summary); + } + + /** + * Add a summary entry. + * + * @param key summary key + * @param value summary value + */ + public void putSummary(String key, String value) { + summary.put(key, value); + } + + @Override + public boolean isSuccess() { + return success; + } + + @Override + public String getErrorMessage() { + return errorMessage; + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutor.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutor.java new file mode 100644 index 0000000000..02076b000a --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutor.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +import java.io.Serializable; + +/** + * Executor interface for maintainer operations. Follows the same pattern as OptimizingExecutor. + * + * @param the maintainer input type + * @param the maintainer output type + */ +public interface MaintainerExecutor + extends Serializable { + + /** + * Execute the maintainer operation. + * + * @return the maintainer output with execution results + */ + O execute(); +} diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutorFactory.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutorFactory.java new file mode 100644 index 0000000000..1caf782f1e --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutorFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +import java.io.Serializable; +import java.util.Map; + +/** + * Factory interface for creating MaintainerExecutor instances. Follows the same pattern as + * OptimizingExecutorFactory. + * + * @param the maintainer input type + */ +public interface MaintainerExecutorFactory extends Serializable { + + /** + * Initialize the factory with task properties. Called after constructing the factory through a + * parameterless constructor. + * + * @param properties the task properties + */ + void initialize(Map properties); + + /** + * Create an executor from the given input. + * + * @param input the maintainer input + * @return the maintainer executor + */ + MaintainerExecutor createExecutor(I input); +} diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerInput.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerInput.java new file mode 100644 index 0000000000..c2c735db55 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerInput.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +import java.io.Serializable; +import java.util.Map; + +/** + * Input interface for maintainer operations executed remotely. Follows the same pattern as + * TableOptimizing.OptimizingInput. + */ +public interface MaintainerInput extends Serializable { + + /** Maintainer operation type */ + enum OperationType { + SNAPSHOT_EXPIRATION, + ORPHAN_FILE_CLEANING, + DANGLING_DELETE_CLEANING, + DATA_EXPIRATION, + TAG_CREATION + } + + /** + * Get the operation type for this maintainer task. + * + * @return the operation type + */ + OperationType getOperationType(); + + /** + * Get the table identifier. + * + * @return table identifier string + */ + String getTableIdentifier(); + + /** + * Get the table format (ICEBERG, PAIMON, etc.). + * + * @return table format string + */ + String getTableFormat(); + + /** + * Set an option for this maintainer operation. + * + * @param name option name + * @param value option value + */ + void option(String name, String value); + + /** + * Set multiple options for this maintainer operation. + * + * @param options map of option names to values + */ + void options(Map options); + + /** + * Get all options for this maintainer operation. + * + * @return map of option names to values + */ + Map getOptions(); +} diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOutput.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOutput.java new file mode 100644 index 0000000000..322aa44b92 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOutput.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +import java.io.Serializable; +import java.util.Map; + +/** + * Output interface for maintainer operations executed remotely. Follows the same pattern as + * TableOptimizing.OptimizingOutput. + */ +public interface MaintainerOutput extends Serializable { + + /** + * Get a summary of the maintainer operation execution. + * + * @return map containing summary information about the operation + */ + Map summary(); + + /** + * Check if the maintainer operation completed successfully. + * + * @return true if operation succeeded, false otherwise + */ + boolean isSuccess(); + + /** + * Get the error message if the operation failed. + * + * @return error message, or null if operation succeeded + */ + String getErrorMessage(); +} diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/pom.xml b/amoro-optimizer/amoro-optimizer-paimon-spark/pom.xml new file mode 100644 index 0000000000..66ba56606e --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/pom.xml @@ -0,0 +1,148 @@ + + + + 4.0.0 + + org.apache.amoro + amoro-optimizer + 0.9-SNAPSHOT + ../pom.xml + + + amoro-optimizer-paimon-spark-${spark.major.version}_${scala.binary.version} + Amoro Project Paimon Spark Optimizer + https://amoro.apache.org + + + + org.apache.amoro + amoro-optimizer-common + ${project.version} + + + + + org.apache.amoro + amoro-format-paimon + ${project.version} + + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + provided + + + org.slf4j + slf4j-api + + + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + org.apache.parquet + parquet-column + + + org.apache.parquet + parquet-hadoop + + + org.slf4j + slf4j-api + + + org.apache.avro + avro + + + org.apache.arrow + arrow-memory-core + + + org.apache.arrow + arrow-memory-netty + + + org.apache.arrow + arrow-vector + + + org.apache.orc + orc-core + + + org.apache.orc + orc-mapreduce + + + + + + + org.apache.paimon + paimon-spark-${spark.major.version} + ${paimon.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-amoro + + shade + + package + + false + + + org.slf4j:slf4j-api + org.apache.hadoop:* + org.apache.hive:* + + + + + org.apache.paimon + org.apache.amoro.shade.org.apache.paimon + + + ${project.artifactId}-${project.version}-jar-with-dependencies + + + + + + + diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkMaintainerExecutor.java b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkMaintainerExecutor.java new file mode 100644 index 0000000000..eca36c97f5 --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkMaintainerExecutor.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizer.paimon.spark; + +import org.apache.amoro.api.OptimizingTask; +import org.apache.amoro.api.OptimizingTaskResult; +import org.apache.amoro.maintainer.MaintainerInput; +import org.apache.amoro.optimizer.common.OptimizerConfig; +import org.apache.amoro.optimizer.common.OptimizerExecutor; +import org.apache.amoro.optimizer.paimon.spark.maintainer.SparkMaintainerTaskFunction; +import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; +import org.apache.amoro.utils.ExceptionUtil; +import org.apache.amoro.utils.SerializationUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Executor for maintainer tasks in Spark environment. Similar to SparkOptimizerExecutor but handles + * maintainer operations. + */ +public class SparkMaintainerExecutor extends OptimizerExecutor { + private static final Logger LOG = LoggerFactory.getLogger(SparkMaintainerExecutor.class); + private final JavaSparkContext jsc; + private final int threadId; + + public SparkMaintainerExecutor(JavaSparkContext jsc, OptimizerConfig config, int threadId) { + super(config, threadId); + this.jsc = jsc; + this.threadId = threadId; + } + + @Override + protected OptimizingTaskResult executeTask(OptimizingTask task) { + OptimizingTaskResult result; + String threadName = Thread.currentThread().getName(); + long startTime = System.currentTimeMillis(); + try { + ImmutableList of = ImmutableList.of(task); + jsc.setJobDescription(jobDescription(task)); + SparkMaintainerTaskFunction taskFunction = + new SparkMaintainerTaskFunction(getConfig(), threadId); + List results = jsc.parallelize(of, 1).map(taskFunction).collect(); + result = results.get(0); + LOG.info( + "Maintainer executor[{}] executed task[{}] and cost {} ms", + threadName, + task.getTaskId(), + System.currentTimeMillis() - startTime); + return result; + } catch (Throwable r) { + LOG.error( + "Maintainer executor[{}] executed task[{}] failed, and cost {} ms", + threadName, + task.getTaskId(), + (System.currentTimeMillis() - startTime), + r); + result = new OptimizingTaskResult(task.getTaskId(), threadId); + result.setErrorMessage(ExceptionUtil.getErrorMessage(r, ERROR_MESSAGE_MAX_LENGTH)); + return result; + } + } + + private String jobDescription(OptimizingTask task) { + MaintainerInput input = SerializationUtil.simpleDeserialize(task.getTaskInput()); + return String.format( + "Amoro Paimon maintainer task, operation: %s, table: %s, task id: %s", + input.getOperationType(), input.getTableIdentifier(), task.getTaskId()); + } +} diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizer.java b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizer.java new file mode 100644 index 0000000000..5528fb641b --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizer.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizer.paimon.spark; + +import org.apache.amoro.optimizer.common.Optimizer; +import org.apache.amoro.optimizer.common.OptimizerConfig; +import org.apache.amoro.optimizer.common.OptimizerToucher; +import org.apache.amoro.resource.Resource; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Main entry point for Paimon Spark optimizer. Supports both optimizing and maintainer tasks. + * + *

Note: Optimizer functionality is currently not implemented. This class serves as a placeholder + * for future Paimon optimizing implementation. + */ +public class SparkOptimizer extends Optimizer { + private static final Logger LOG = LoggerFactory.getLogger(SparkOptimizer.class); + private static final String APP_NAME_FORMAT = "amoro-paimon-spark-optimizer-%s"; + + public SparkOptimizer(OptimizerConfig config, JavaSparkContext jsc) { + super(config, () -> new OptimizerToucher(config), (i) -> new SparkOptimizerExecutor(config, i)); + } + + public static void main(String[] args) throws Exception { + OptimizerConfig config = new OptimizerConfig(args); + SparkSession spark = + SparkSession.builder() + .appName(String.format(APP_NAME_FORMAT, config.getResourceId())) + .getOrCreate(); + JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); + + if (!jsc.getConf().getBoolean("spark.dynamicAllocation.enabled", false)) { + LOG.warn( + "To better utilize computing resources, it is recommended to enable " + + "'spark.dynamicAllocation.enabled' " + + "and set 'spark.dynamicAllocation.maxExecutors' equal to 'OPTIMIZER_EXECUTION_PARALLEL'"); + } + + // Calculate optimizer memory allocation + int driverMemory = Utils.memoryStringToMb(jsc.getConf().get("spark.driver.memory", "1g")); + int executorMemory = Utils.memoryStringToMb(jsc.getConf().get("spark.executor.memory", "1g")); + int executorCores = jsc.getConf().getInt("spark.executor.cores", 1); + int executionParallel = config.getExecutionParallel(); + int executorNum = (int) Math.ceil((double) executionParallel / executorCores); + config.setMemorySize(driverMemory + executorNum * executorMemory); + + SparkOptimizer optimizer = new SparkOptimizer(config, jsc); + OptimizerToucher toucher = optimizer.getToucher(); + toucher.withRegisterProperty(Resource.PROPERTY_JOB_ID, spark.sparkContext().applicationId()); + + LOG.info("Starting the Paimon Spark optimizer with configuration:{}", config); + optimizer.startOptimizing(); + } +} diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizerExecutor.java b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizerExecutor.java new file mode 100644 index 0000000000..2d7f6a1835 --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizerExecutor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizer.paimon.spark; + +import org.apache.amoro.api.OptimizingTask; +import org.apache.amoro.api.OptimizingTaskResult; +import org.apache.amoro.optimizer.common.OptimizerConfig; +import org.apache.amoro.optimizer.common.OptimizerExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Executor for Paimon optimizing tasks in Spark environment. + * + *

This is a placeholder implementation for future Paimon optimizing functionality. Currently, + * all optimizing operations will throw an UnsupportedOperationException. + * + *

For maintainer operations, use {@link SparkMaintainerExecutor} instead. + */ +public class SparkOptimizerExecutor extends OptimizerExecutor { + private static final Logger LOG = LoggerFactory.getLogger(SparkOptimizerExecutor.class); + + public SparkOptimizerExecutor(OptimizerConfig config, int threadId) { + super(config, threadId); + } + + @Override + protected OptimizingTaskResult executeTask(OptimizingTask task) { + String errorMessage = + "Paimon optimizing is not yet implemented. " + + "For maintainer operations, please use SparkMaintainerExecutor instead."; + LOG.error( + "Optimizer executor[{}] encountered unsupported operation: {}", + getThreadId(), + errorMessage); + OptimizingTaskResult result = new OptimizingTaskResult(task.getTaskId(), getThreadId()); + result.setErrorMessage(errorMessage); + return result; + } +} diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonMaintainerOutput.java b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonMaintainerOutput.java new file mode 100644 index 0000000000..b7cae6c947 --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonMaintainerOutput.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizer.paimon.spark.maintainer; + +import org.apache.amoro.maintainer.BaseMaintainerOutput; + +/** Output for Paimon snapshot expiration operation. */ +public class PaimonMaintainerOutput extends BaseMaintainerOutput { + + private static final long serialVersionUID = 1L; + + /** Summary key for expired snapshot count */ + public static final String EXPIRED_SNAPSHOT_COUNT = "expired_snapshot_count"; + + /** Summary key for expired data file count */ + public static final String EXPIRED_DATA_FILE_COUNT = "expired_data_file_count"; + + /** Summary key for expired data file size */ + public static final String EXPIRED_DATA_FILE_SIZE = "expired_data_file_size"; + + /** Create a successful maintainer output. */ + public PaimonMaintainerOutput() { + super(true, null); + } + + /** + * Create a maintainer output with specified status. + * + * @param success whether the operation succeeded + * @param errorMessage error message if failed, null otherwise + */ + public PaimonMaintainerOutput(boolean success, String errorMessage) { + super(success, errorMessage); + } + + /** + * Set the number of expired snapshots. + * + * @param count number of snapshots expired + */ + public void setExpiredSnapshotCount(int count) { + putSummary(EXPIRED_SNAPSHOT_COUNT, String.valueOf(count)); + } + + /** + * Set the number of expired data files. + * + * @param count number of data files expired + */ + public void setExpiredDataFileCount(int count) { + putSummary(EXPIRED_DATA_FILE_COUNT, String.valueOf(count)); + } + + /** + * Set the total size of expired data files. + * + * @param size total size in bytes + */ + public void setExpiredDataFileSize(long size) { + putSummary(EXPIRED_DATA_FILE_SIZE, String.valueOf(size)); + } + + /** + * Get the number of expired snapshots. + * + * @return number of snapshots expired, or 0 if not set + */ + public int getExpiredSnapshotCount() { + String value = summary().get(EXPIRED_SNAPSHOT_COUNT); + return value == null ? 0 : Integer.parseInt(value); + } + + /** + * Get the number of expired data files. + * + * @return number of data files expired, or 0 if not set + */ + public int getExpiredDataFileCount() { + String value = summary().get(EXPIRED_DATA_FILE_COUNT); + return value == null ? 0 : Integer.parseInt(value); + } + + /** + * Get the total size of expired data files. + * + * @return total size in bytes, or 0 if not set + */ + public long getExpiredDataFileSize() { + String value = summary().get(EXPIRED_DATA_FILE_SIZE); + return value == null ? 0 : Long.parseLong(value); + } +} diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireExecutor.java b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireExecutor.java new file mode 100644 index 0000000000..c85d0435bc --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireExecutor.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizer.paimon.spark.maintainer; + +import org.apache.amoro.maintainer.MaintainerExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Executor for Paimon snapshot expiration. + * + *

This executor handles the expiration of old snapshots in Paimon tables based on time and count + * thresholds. + */ +public class PaimonSnapshotExpireExecutor + implements MaintainerExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(PaimonSnapshotExpireExecutor.class); + + private final PaimonSnapshotExpireInput input; + + public PaimonSnapshotExpireExecutor(PaimonSnapshotExpireInput input) { + this.input = input; + } + + @Override + public PaimonMaintainerOutput execute() { + try { + LOG.info( + "Starting snapshot expiration for table {}, older than {}, retain last {}", + input.getTableIdentifier(), + input.getOlderThanMillis(), + input.getRetainLastCount()); + + // TODO: Implement Paimon snapshot expiration logic + // 1. Load Paimon table from options (table location, database, name) + // 2. Collect snapshots to expire based on time and count + // 3. Call Paimon's snapshot expiration API + // 4. Collect statistics (snapshot count, file count, file size) + // 5. Return output with statistics + + // Placeholder implementation - returns success with no expired snapshots + LOG.info( + "Snapshot expiration for table {} completed (placeholder implementation)", + input.getTableIdentifier()); + + PaimonMaintainerOutput output = new PaimonMaintainerOutput(); + output.setExpiredSnapshotCount(0); + output.setExpiredDataFileCount(0); + output.setExpiredDataFileSize(0); + return output; + + } catch (Throwable t) { + LOG.error("Failed to expire snapshots for table {}", input.getTableIdentifier(), t); + return new PaimonMaintainerOutput(false, t.getMessage()); + } + } +} diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireFactory.java b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireFactory.java new file mode 100644 index 0000000000..fff23804fa --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizer.paimon.spark.maintainer; + +import org.apache.amoro.maintainer.MaintainerExecutor; +import org.apache.amoro.maintainer.MaintainerExecutorFactory; + +import java.util.Map; + +/** Factory for creating PaimonSnapshotExpireExecutor instances. */ +public class PaimonSnapshotExpireFactory + implements MaintainerExecutorFactory { + + private static final long serialVersionUID = 1L; + + private Map properties; + + /** Default constructor required for DynConstructors. */ + public PaimonSnapshotExpireFactory() { + // Required for DynConstructors + } + + @Override + public void initialize(Map properties) { + this.properties = properties; + } + + @Override + public MaintainerExecutor createExecutor(PaimonSnapshotExpireInput input) { + return new PaimonSnapshotExpireExecutor(input); + } +} diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireInput.java b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireInput.java new file mode 100644 index 0000000000..7db5fcbae5 --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireInput.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizer.paimon.spark.maintainer; + +import org.apache.amoro.maintainer.BaseMaintainerInput; + +/** Input for Paimon snapshot expiration operation. */ +public class PaimonSnapshotExpireInput extends BaseMaintainerInput { + + private static final long serialVersionUID = 1L; + + private final String tableIdentifier; + private final String tableFormat; + private final long olderThanMillis; + private final int retainLastCount; + + // Table metadata will be serialized and passed through options + public static final String TABLE_LOCATION = "table.location"; + public static final String TABLE_DATABASE = "table.database"; + public static final String TABLE_NAME = "table.name"; + + /** + * Create a Paimon snapshot expiration input. + * + * @param tableIdentifier the full table identifier + * @param tableFormat the table format (e.g., "PAIMON") + * @param olderThanMillis expire snapshots older than this timestamp (milliseconds) + * @param retainLastCount retain at least this many snapshots + */ + public PaimonSnapshotExpireInput( + String tableIdentifier, String tableFormat, long olderThanMillis, int retainLastCount) { + this.tableIdentifier = tableIdentifier; + this.tableFormat = tableFormat; + this.olderThanMillis = olderThanMillis; + this.retainLastCount = retainLastCount; + } + + @Override + public OperationType getOperationType() { + return OperationType.SNAPSHOT_EXPIRATION; + } + + @Override + public String getTableIdentifier() { + return tableIdentifier; + } + + @Override + public String getTableFormat() { + return tableFormat; + } + + /** + * Get the expiration threshold in milliseconds. + * + * @return snapshots older than this timestamp should be expired + */ + public long getOlderThanMillis() { + return olderThanMillis; + } + + /** + * Get the minimum number of snapshots to retain. + * + * @return minimum snapshots to keep + */ + public int getRetainLastCount() { + return retainLastCount; + } +} diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/SparkMaintainerTaskFunction.java b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/SparkMaintainerTaskFunction.java new file mode 100644 index 0000000000..e2badb4b8d --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/SparkMaintainerTaskFunction.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizer.paimon.spark.maintainer; + +import org.apache.amoro.api.OptimizingTask; +import org.apache.amoro.api.OptimizingTaskResult; +import org.apache.amoro.maintainer.MaintainerExecutor; +import org.apache.amoro.maintainer.MaintainerExecutorFactory; +import org.apache.amoro.maintainer.MaintainerInput; +import org.apache.amoro.maintainer.MaintainerOutput; +import org.apache.amoro.optimizer.common.OptimizerConfig; +import org.apache.amoro.optimizing.TaskProperties; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.amoro.utils.ExceptionUtil; +import org.apache.amoro.utils.SerializationUtil; +import org.apache.iceberg.common.DynConstructors; +import org.apache.spark.api.java.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Map; + +/** + * Spark function to execute maintainer tasks. Similar to SparkOptimizingTaskFunction but for + * maintainer operations. + */ +public class SparkMaintainerTaskFunction implements Function { + private static final Logger LOG = LoggerFactory.getLogger(SparkMaintainerTaskFunction.class); + private static final int ERROR_MESSAGE_MAX_LENGTH = 4000; + + private final OptimizerConfig config; + private final int threadId; + + public SparkMaintainerTaskFunction(OptimizerConfig config, int threadId) { + this.config = config; + this.threadId = threadId; + } + + @Override + public OptimizingTaskResult call(OptimizingTask task) throws Exception { + long startTime = System.currentTimeMillis(); + MaintainerInput input; + try { + Map taskProperties = fillTaskProperties(config, task); + input = SerializationUtil.simpleDeserialize(task.getTaskInput()); + + String factoryImpl = taskProperties.get(TaskProperties.TASK_EXECUTOR_FACTORY_IMPL); + DynConstructors.Ctor ctor = + DynConstructors.builder(MaintainerExecutorFactory.class).impl(factoryImpl).buildChecked(); + MaintainerExecutorFactory factory = ctor.newInstance(); + + factory.initialize(taskProperties); + MaintainerExecutor executor = factory.createExecutor(input); + MaintainerOutput output = (MaintainerOutput) executor.execute(); + + ByteBuffer outputByteBuffer = SerializationUtil.simpleSerialize(output); + OptimizingTaskResult result = new OptimizingTaskResult(task.getTaskId(), threadId); + result.setTaskOutput(outputByteBuffer); + result.setSummary(output.summary()); + + LOG.info( + "Maintainer executor[{}] executed task[{}]({}) and cost {} ms", + threadId, + task.getTaskId(), + input, + System.currentTimeMillis() - startTime); + return result; + } catch (Throwable t) { + LOG.error( + "Maintainer executor[{}] executed task[{}] failed and cost {} ms", + threadId, + task.getTaskId(), + System.currentTimeMillis() - startTime, + t); + OptimizingTaskResult errorResult = new OptimizingTaskResult(task.getTaskId(), threadId); + errorResult.setErrorMessage(ExceptionUtil.getErrorMessage(t, ERROR_MESSAGE_MAX_LENGTH)); + return errorResult; + } + } + + private static Map fillTaskProperties( + OptimizerConfig config, OptimizingTask task) { + Map properties = Maps.newHashMap(task.getProperties()); + properties.put(TaskProperties.PROCESS_ID, String.valueOf(task.getTaskId().getProcessId())); + return properties; + } +} diff --git a/amoro-optimizer/pom.xml b/amoro-optimizer/pom.xml index 5749b1361f..21e9f72716 100644 --- a/amoro-optimizer/pom.xml +++ b/amoro-optimizer/pom.xml @@ -35,6 +35,7 @@ amoro-optimizer-common amoro-optimizer-flink amoro-optimizer-spark + amoro-optimizer-paimon-spark amoro-optimizer-standalone