From 295e545181ef55070cbd919f70996353276e0ae7 Mon Sep 17 00:00:00 2001 From: smdsbz Date: Wed, 22 Jan 2025 10:19:18 +0800 Subject: [PATCH 1/2] ObjectRefresh with iterative list and batched commit --- .../paimon/table/object/ObjectRefresh.java | 48 +++++++------------ 1 file changed, 16 insertions(+), 32 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java index b1be840c5153..1c1532156d46 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java @@ -23,56 +23,40 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; -import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.RemoteIterator; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; -import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; /** Util class for refreshing object table. */ public class ObjectRefresh { - public static long refresh(ObjectTable table) throws Exception { - String location = table.objectLocation(); + private static final long COMMIT_BATCH_SIZE = 1000; - // 1. collect all files for object table - List fileCollector = new ArrayList<>(); - listAllFiles(table.objectFileIO(), new Path(location), fileCollector); + public static long refresh(ObjectTable table) throws Exception { + long totalObjs = 0; - // 2. write to underlying table BatchWriteBuilder writeBuilder = table.underlyingTable().newBatchWriteBuilder().withOverwrite(); - try (BatchTableWrite write = writeBuilder.newWrite(); - BatchTableCommit commit = writeBuilder.newCommit()) { - for (FileStatus file : fileCollector) { - write.write(toRow(file)); + try (RemoteIterator objIter = + table.objectFileIO().listFilesIterative(new Path(table.objectLocation()), true)) { + while (objIter.hasNext()) { + try (BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit()) { + for (int i = 0; i < COMMIT_BATCH_SIZE && objIter.hasNext(); i++) { + totalObjs++; + write.write(toRow(objIter.next())); + } + commit.commit(write.prepareCommit()); + } } - commit.commit(write.prepareCommit()); - } - - return fileCollector.size(); - } - - private static void listAllFiles(FileIO fileIO, Path directory, List fileCollector) - throws IOException { - FileStatus[] files = fileIO.listStatus(directory); - if (files == null) { - return; } - for (FileStatus file : files) { - if (file.isDir()) { - listAllFiles(fileIO, file.getPath(), fileCollector); - } else { - fileCollector.add(file); - } - } + return totalObjs; } private static InternalRow toRow(FileStatus file) { From db02c8922cedaa1e69c6602befdcd1dbfb3e16f6 Mon Sep 17 00:00:00 2001 From: "zhuxiaoguang.zxg" Date: Wed, 22 Jan 2025 16:15:23 +0800 Subject: [PATCH 2/2] larger commit batch size --- .../main/java/org/apache/paimon/table/object/ObjectRefresh.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java index 1c1532156d46..2aa3a0be2e95 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java @@ -35,7 +35,7 @@ /** Util class for refreshing object table. */ public class ObjectRefresh { - private static final long COMMIT_BATCH_SIZE = 1000; + private static final long COMMIT_BATCH_SIZE = 10_000; public static long refresh(ObjectTable table) throws Exception { long totalObjs = 0;