diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index c2b9be4c2725..54e082091840 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -33,6 +33,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.DateTimeUtils; +import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SerializableConsumer; @@ -252,12 +253,14 @@ protected void collectWithoutDataFileWithManifestFlag( /** List directories that contains data files and manifest files. */ protected List listPaimonFileDirs() { + FileStorePathFactory pathFactory = table.store().pathFactory(); + List paimonFileDirs = new ArrayList<>(); - paimonFileDirs.add(new Path(location, "manifest")); - paimonFileDirs.add(new Path(location, "index")); - paimonFileDirs.add(new Path(location, "statistics")); - paimonFileDirs.addAll(listFileDirs(location, partitionKeysNum)); + paimonFileDirs.add(pathFactory.manifestPath()); + paimonFileDirs.add(pathFactory.indexPath()); + paimonFileDirs.add(pathFactory.statisticsPath()); + paimonFileDirs.addAll(listFileDirs(pathFactory.dataFilePath(), partitionKeysNum)); return paimonFileDirs; } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java index 81b6307a5f1a..811a2a4e6dc7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java @@ -37,6 +37,17 @@ @ThreadSafe public class FileStorePathFactory { + public static final String MANIFEST_PATH = "manifest"; + public static final String MANIFEST_PREFIX = "manifest-"; + public static final String MANIFEST_LIST_PREFIX = "manifest-list-"; + public static final String INDEX_MANIFEST_PREFIX = "index-manifest-"; + + public static final String INDEX_PATH = "index"; + public static final String INDEX_PREFIX = "index-"; + + public static final String STATISTICS_PATH = "statistics"; + public static final String STATISTICS_PREFIX = "stat-"; + public static final String BUCKET_PATH_PREFIX = "bucket-"; // this is the table schema root path @@ -94,6 +105,25 @@ public Path root() { return root; } + public Path manifestPath() { + return new Path(root, MANIFEST_PATH); + } + + public Path indexPath() { + return new Path(root, INDEX_PATH); + } + + public Path statisticsPath() { + return new Path(root, STATISTICS_PATH); + } + + public Path dataFilePath() { + if (dataFilePathDirectory != null) { + return new Path(root, dataFilePathDirectory); + } + return root; + } + @VisibleForTesting public static InternalRowPartitionComputer getPartitionComputer( RowType partitionType, String defaultPartValue, boolean legacyPartitionName) { @@ -103,25 +133,21 @@ public static InternalRowPartitionComputer getPartitionComputer( } public Path newManifestFile() { - return new Path( - root + "/manifest/manifest-" + uuid + "-" + manifestFileCount.getAndIncrement()); + return toManifestFilePath( + MANIFEST_PREFIX + uuid + "-" + manifestFileCount.getAndIncrement()); } public Path newManifestList() { - return new Path( - root - + "/manifest/manifest-list-" - + uuid - + "-" - + manifestListCount.getAndIncrement()); + return toManifestListPath( + MANIFEST_LIST_PREFIX + uuid + "-" + manifestListCount.getAndIncrement()); } public Path toManifestFilePath(String manifestFileName) { - return new Path(root + "/manifest/" + manifestFileName); + return new Path(manifestPath(), manifestFileName); } public Path toManifestListPath(String manifestListName) { - return new Path(root + "/manifest/" + manifestListName); + return new Path(manifestPath(), manifestListName); } public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bucket) { @@ -217,17 +243,13 @@ public PathFactory indexManifestFileFactory() { return new PathFactory() { @Override public Path newPath() { - return new Path( - root - + "/manifest/index-manifest-" - + uuid - + "-" - + indexManifestCount.getAndIncrement()); + return toPath( + INDEX_MANIFEST_PREFIX + uuid + "-" + indexManifestCount.getAndIncrement()); } @Override public Path toPath(String fileName) { - return new Path(root + "/manifest/" + fileName); + return new Path(manifestPath(), fileName); } }; } @@ -236,13 +258,12 @@ public PathFactory indexFileFactory() { return new PathFactory() { @Override public Path newPath() { - return new Path( - root + "/index/index-" + uuid + "-" + indexFileCount.getAndIncrement()); + return toPath(INDEX_PREFIX + uuid + "-" + indexFileCount.getAndIncrement()); } @Override public Path toPath(String fileName) { - return new Path(root + "/index/" + fileName); + return new Path(indexPath(), fileName); } }; } @@ -251,17 +272,12 @@ public PathFactory statsFileFactory() { return new PathFactory() { @Override public Path newPath() { - return new Path( - root - + "/statistics/stats-" - + uuid - + "-" - + statsFileCount.getAndIncrement()); + return toPath(STATISTICS_PREFIX + uuid + "-" + statsFileCount.getAndIncrement()); } @Override public Path toPath(String fileName) { - return new Path(root + "/statistics/" + fileName); + return new Path(statisticsPath(), fileName); } }; } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala index 3ffe7fba264f..f45655d5147c 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala @@ -219,4 +219,25 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) } + test("Paimon procedure: remove orphan files with data file path directory") { + sql(s""" + |CREATE TABLE T (id STRING, name STRING) + |USING PAIMON + |TBLPROPERTIES ('primary-key'='id', 'data-file.path-directory'='data') + |""".stripMargin) + + sql(s"INSERT INTO T VALUES ('1', 'a'), ('2', 'b')") + + val table = loadTable("T") + val orphanFile = new Path(table.store().pathFactory().dataFilePath(), ORPHAN_FILE_1) + table.fileIO().tryToWriteAtomic(orphanFile, "b") + + Thread.sleep(1000) + val older_than = DateTimeUtils.formatLocalDateTime( + DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), + 3) + checkAnswer( + sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than')"), + Row(1, 1) :: Nil) + } }