From f712e0c99dc71774cec1d886348c3355d02ead76 Mon Sep 17 00:00:00 2001 From: smdsbz Date: Fri, 27 Dec 2024 15:52:58 +0800 Subject: [PATCH 1/7] proposing FileIO#listFilesPaged (cherry picked from commit b8564a115e1a1992b4354a2feeb0a39ab129b515) --- .../java/org/apache/paimon/fs/FileIO.java | 68 ++++++++++++++ .../java/org/apache/paimon/fs/FileIOTest.java | 92 ++++++++++++++++++- 2 files changed, 155 insertions(+), 5 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index fe2e31b8db7b..378475d71595 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -22,11 +22,13 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.hadoop.HadoopFileIOLoader; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.utils.Pair; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import java.io.BufferedReader; @@ -39,6 +41,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -104,6 +107,71 @@ public interface FileIO extends Serializable { */ FileStatus[] listStatus(Path path) throws IOException; + /** + * List the statuses of the files in the given path if the path is a directory. + * + * @param path given path + * @param recursive if set to true will recursively list files in sub-directories, + * otherwise only files in the current directory will be listed + * @return the statuses of the files in the given path + */ + default FileStatus[] listFiles(Path path, boolean recursive) throws IOException { + List statuses = new ArrayList<>(); + FileStatus[] outer = listStatus(path); + if (outer != null) { + for (FileStatus f : outer) { + if (!f.isDir()) { + statuses.add(f); + continue; + } + if (!recursive) { + continue; + } + FileStatus[] inner = listFiles(f.getPath(), true); + statuses.addAll(Arrays.asList(inner)); + } + } + return statuses.toArray(new FileStatus[0]); + } + + /** + * List the statuses of the files in the given path in non-overlapping pages, if the path is a + * directory. + * + * @param path given path + * @param recursive if set to true will recursively list files in subdirectories, + * otherwise only files in the current directory will be listed + * @param pageSize maximum size of the page + * @param continuationToken If supplied will list files after this token, otherwise list from + * the beginning. You may acquire this token from the return of this method. + * @return A page of statuses of the files in the given path and the continuation token of this + * page. The continuation token will be null if the returned page is the last + * page. + */ + default Pair listFilesPaged( + Path path, boolean recursive, long pageSize, @Nullable String continuationToken) + throws IOException { + FileStatus[] all = listFiles(path, recursive); + FileStatus[] paged = + Arrays.stream(all) + .sorted(Comparator.comparing(FileStatus::getPath)) + .filter( + f -> + continuationToken == null + || f.getPath() + .toUri() + .toString() + .compareTo(continuationToken) + > 0) + .limit(pageSize) + .toArray(FileStatus[]::new); + String nextToken = + paged.length < pageSize + ? null + : paged[paged.length - 1].getPath().toUri().toString(); + return Pair.of(paged, nextToken); + } + /** * List the statuses of the directories in the given path if the path is a directory. * diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java index 8dbcf4185772..0afd182ab7dc 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; +import org.apache.paimon.utils.Pair; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -134,7 +135,88 @@ public static void testOverwriteFileUtf8(Path file, FileIO fileIO) throws Interr assertThat(exception.get()).isNull(); } - /** A {@link FileIO} on local filesystem to test the default copy implementation. */ + @Test + public void testListFiles() throws Exception { + Path fileA = new Path(tempDir.resolve("a").toUri()); + Path dirB = new Path(tempDir.resolve("b").toUri()); + Path fileBC = new Path(tempDir.resolve("b/c").toUri()); + + FileIO fileIO = new LocalFileIO(); + fileIO.writeFile(fileA, "fileA", false); + fileIO.mkdirs(dirB); + fileIO.writeFile(fileBC, "fileBC", false); + + { + // if listing non-recursively, file "a" is the only file in the top level directory + FileStatus[] statuses = fileIO.listFiles(new Path(tempDir.toUri()), false); + assertThat(statuses.length).isEqualTo(1); + assertThat(statuses[0].getPath()).isEqualTo(fileA); + } + + { + // if listing recursively, file "a" and "b/c" should be listed, directory "b" should be + // omitted + FileStatus[] statuses = fileIO.listFiles(new Path(tempDir.toUri()), true); + assertThat(statuses.length).isEqualTo(2); + assertThat(statuses[0].getPath()).isEqualTo(fileA); + assertThat(statuses[1].getPath()).isEqualTo(fileBC); + } + } + + @Test + public void testListFilesPaged() throws Exception { + FileIO fileIO = new LocalFileIO(); + // 10 files starting with "a" + for (int i = 0; i < 10; i++) { + Path p = new Path(tempDir.resolve(String.format("a-%02d", i)).toUri()); + fileIO.writeFile(p, p.toString(), false); + } + // 10 files starting with "b" + for (int i = 0; i < 10; i++) { + Path p = new Path(tempDir.resolve(String.format("b-%02d", i)).toUri()); + fileIO.writeFile(p, p.toString(), false); + } + // 10 files under directory "c" + fileIO.mkdirs(new Path(tempDir.resolve("c").toUri())); + for (int i = 0; i < 10; i++) { + Path p = new Path(tempDir.resolve(String.format("c/c-%02d", i)).toUri()); + fileIO.writeFile(p, p.toString(), false); + } + + { + // first 5 files should be "a-00" to "a-04" + Pair page = + fileIO.listFilesPaged(new Path(tempDir.toUri()), true, 5, null); + assertThat(page.getLeft().length).isEqualTo(5); + assertThat(page.getLeft()[0].getPath().getName()).isEqualTo("a-00"); + assertThat(page.getLeft()[4].getPath().getName()).isEqualTo("a-04"); + assertThat(page.getRight()).isNotNull(); + // the next 10 files should be "a-05" to "b-04" + page = fileIO.listFilesPaged(new Path(tempDir.toUri()), true, 10, page.getRight()); + assertThat(page.getLeft().length).isEqualTo(10); + assertThat(page.getLeft()[0].getPath().getName()).isEqualTo("a-05"); + assertThat(page.getLeft()[9].getPath().getName()).isEqualTo("b-04"); + assertThat(page.getRight()).isNotNull(); + // next 10 files should recurse to "c/c-04" + page = fileIO.listFilesPaged(new Path(tempDir.toUri()), true, 10, page.getRight()); + assertThat(page.getLeft().length).isEqualTo(10); + assertThat(page.getLeft()[9].getPath().getParent().getName()).isEqualTo("c"); + assertThat(page.getLeft()[9].getPath().getName()).isEqualTo("c-04"); + assertThat(page.getRight()).isNotNull(); + } + + { + // list all files non-recursively should return "a-00" through "b-09" and no more + Pair page = + fileIO.listFilesPaged(new Path(tempDir.toUri()), false, 9999, null); + assertThat(page.getLeft().length).isEqualTo(20); + assertThat(page.getLeft()[0].getPath().getName()).isEqualTo("a-00"); + assertThat(page.getLeft()[19].getPath().getName()).isEqualTo("b-09"); + assertThat(page.getRight()).isNull(); + } + } + + /** A {@link FileIO} on local filesystem to test various default implementations. */ private static class DummyFileIO implements FileIO { private static final ReentrantLock RENAME_LOCK = new ReentrantLock(); @@ -169,13 +251,13 @@ public PositionOutputStream newOutputStream(Path path, boolean overwrite) } @Override - public FileStatus getFileStatus(Path path) { - throw new UnsupportedOperationException(); + public FileStatus getFileStatus(Path path) throws IOException { + return new LocalFileIO().getFileStatus(path); } @Override - public FileStatus[] listStatus(Path path) { - throw new UnsupportedOperationException(); + public FileStatus[] listStatus(Path path) throws IOException { + return new LocalFileIO().listStatus(path); } @Override From 0114872c1465ea6e881c931a82faa906cfa7719c Mon Sep 17 00:00:00 2001 From: smdsbz Date: Tue, 7 Jan 2025 10:17:05 +0800 Subject: [PATCH 2/7] fix ut --- .../src/test/java/org/apache/paimon/fs/FileIOTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java index 0afd182ab7dc..3ba3a738465e 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java @@ -36,6 +36,8 @@ import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.StandardCopyOption; +import java.util.Arrays; +import java.util.Comparator; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; @@ -158,6 +160,10 @@ public void testListFiles() throws Exception { // omitted FileStatus[] statuses = fileIO.listFiles(new Path(tempDir.toUri()), true); assertThat(statuses.length).isEqualTo(2); + statuses = + Arrays.stream(statuses) + .sorted(Comparator.comparing(FileStatus::getPath)) + .toArray(FileStatus[]::new); assertThat(statuses[0].getPath()).isEqualTo(fileA); assertThat(statuses[1].getPath()).isEqualTo(fileBC); } From 66c8006b131cda1630cf3c9708b0c1d17164a05c Mon Sep 17 00:00:00 2001 From: smdsbz Date: Tue, 14 Jan 2025 11:18:48 +0800 Subject: [PATCH 3/7] erasing default impl for listFilesPaged --- .../java/org/apache/paimon/fs/FileIO.java | 21 +------- .../java/org/apache/paimon/fs/FileIOTest.java | 54 ------------------- 2 files changed, 1 insertion(+), 74 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index 378475d71595..2e0960e5ee93 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -41,7 +41,6 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -151,25 +150,7 @@ default FileStatus[] listFiles(Path path, boolean recursive) throws IOException default Pair listFilesPaged( Path path, boolean recursive, long pageSize, @Nullable String continuationToken) throws IOException { - FileStatus[] all = listFiles(path, recursive); - FileStatus[] paged = - Arrays.stream(all) - .sorted(Comparator.comparing(FileStatus::getPath)) - .filter( - f -> - continuationToken == null - || f.getPath() - .toUri() - .toString() - .compareTo(continuationToken) - > 0) - .limit(pageSize) - .toArray(FileStatus[]::new); - String nextToken = - paged.length < pageSize - ? null - : paged[paged.length - 1].getPath().toUri().toString(); - return Pair.of(paged, nextToken); + throw new UnsupportedOperationException(); } /** diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java index 3ba3a738465e..367bce383719 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java @@ -21,7 +21,6 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; -import org.apache.paimon.utils.Pair; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -169,59 +168,6 @@ public void testListFiles() throws Exception { } } - @Test - public void testListFilesPaged() throws Exception { - FileIO fileIO = new LocalFileIO(); - // 10 files starting with "a" - for (int i = 0; i < 10; i++) { - Path p = new Path(tempDir.resolve(String.format("a-%02d", i)).toUri()); - fileIO.writeFile(p, p.toString(), false); - } - // 10 files starting with "b" - for (int i = 0; i < 10; i++) { - Path p = new Path(tempDir.resolve(String.format("b-%02d", i)).toUri()); - fileIO.writeFile(p, p.toString(), false); - } - // 10 files under directory "c" - fileIO.mkdirs(new Path(tempDir.resolve("c").toUri())); - for (int i = 0; i < 10; i++) { - Path p = new Path(tempDir.resolve(String.format("c/c-%02d", i)).toUri()); - fileIO.writeFile(p, p.toString(), false); - } - - { - // first 5 files should be "a-00" to "a-04" - Pair page = - fileIO.listFilesPaged(new Path(tempDir.toUri()), true, 5, null); - assertThat(page.getLeft().length).isEqualTo(5); - assertThat(page.getLeft()[0].getPath().getName()).isEqualTo("a-00"); - assertThat(page.getLeft()[4].getPath().getName()).isEqualTo("a-04"); - assertThat(page.getRight()).isNotNull(); - // the next 10 files should be "a-05" to "b-04" - page = fileIO.listFilesPaged(new Path(tempDir.toUri()), true, 10, page.getRight()); - assertThat(page.getLeft().length).isEqualTo(10); - assertThat(page.getLeft()[0].getPath().getName()).isEqualTo("a-05"); - assertThat(page.getLeft()[9].getPath().getName()).isEqualTo("b-04"); - assertThat(page.getRight()).isNotNull(); - // next 10 files should recurse to "c/c-04" - page = fileIO.listFilesPaged(new Path(tempDir.toUri()), true, 10, page.getRight()); - assertThat(page.getLeft().length).isEqualTo(10); - assertThat(page.getLeft()[9].getPath().getParent().getName()).isEqualTo("c"); - assertThat(page.getLeft()[9].getPath().getName()).isEqualTo("c-04"); - assertThat(page.getRight()).isNotNull(); - } - - { - // list all files non-recursively should return "a-00" through "b-09" and no more - Pair page = - fileIO.listFilesPaged(new Path(tempDir.toUri()), false, 9999, null); - assertThat(page.getLeft().length).isEqualTo(20); - assertThat(page.getLeft()[0].getPath().getName()).isEqualTo("a-00"); - assertThat(page.getLeft()[19].getPath().getName()).isEqualTo("b-09"); - assertThat(page.getRight()).isNull(); - } - } - /** A {@link FileIO} on local filesystem to test various default implementations. */ private static class DummyFileIO implements FileIO { private static final ReentrantLock RENAME_LOCK = new ReentrantLock(); From afb4964bad7df2b91f47d314a5616341a767f2d8 Mon Sep 17 00:00:00 2001 From: smdsbz Date: Tue, 14 Jan 2025 13:02:06 +0800 Subject: [PATCH 4/7] method to test availability of listFilesPaged --- .../src/main/java/org/apache/paimon/fs/FileIO.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index 2e0960e5ee93..d5b5c4ff3b50 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -133,6 +133,15 @@ default FileStatus[] listFiles(Path path, boolean recursive) throws IOException return statuses.toArray(new FileStatus[0]); } + /** + * Tests whether {@link #listFilesPaged} is supported. + * + * @return whether {@link #listFilesPaged} is supported + */ + default boolean supportsListFilesPaged() { + return false; + } + /** * List the statuses of the files in the given path in non-overlapping pages, if the path is a * directory. From 230ed5b35095001218faf46cbaf851f12a750a19 Mon Sep 17 00:00:00 2001 From: smdsbz Date: Tue, 14 Jan 2025 17:24:43 +0800 Subject: [PATCH 5/7] iterative list --- .../java/org/apache/paimon/fs/FileIO.java | 76 +++++++++---------- 1 file changed, 37 insertions(+), 39 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index d5b5c4ff3b50..006276dbbb32 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -22,13 +22,11 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.hadoop.HadoopFileIOLoader; import org.apache.paimon.fs.local.LocalFileIO; -import org.apache.paimon.utils.Pair; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import java.io.BufferedReader; @@ -41,12 +39,15 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Queue; import java.util.ServiceLoader; import java.util.Set; import java.util.stream.Collectors; @@ -110,56 +111,53 @@ public interface FileIO extends Serializable { * List the statuses of the files in the given path if the path is a directory. * * @param path given path - * @param recursive if set to true will recursively list files in sub-directories, + * @param recursive if set to true will recursively list files in subdirectories, * otherwise only files in the current directory will be listed * @return the statuses of the files in the given path */ default FileStatus[] listFiles(Path path, boolean recursive) throws IOException { - List statuses = new ArrayList<>(); - FileStatus[] outer = listStatus(path); - if (outer != null) { - for (FileStatus f : outer) { - if (!f.isDir()) { - statuses.add(f); - continue; - } - if (!recursive) { - continue; - } - FileStatus[] inner = listFiles(f.getPath(), true); - statuses.addAll(Arrays.asList(inner)); - } - } - return statuses.toArray(new FileStatus[0]); + List files = new ArrayList<>(); + Iterator iter = listFilesIterative(path, recursive); + iter.forEachRemaining(files::add); + return files.toArray(new FileStatus[0]); } /** - * Tests whether {@link #listFilesPaged} is supported. - * - * @return whether {@link #listFilesPaged} is supported - */ - default boolean supportsListFilesPaged() { - return false; - } - - /** - * List the statuses of the files in the given path in non-overlapping pages, if the path is a - * directory. + * List the statuses of the files iteratively in the given path if the path is a directory. * * @param path given path * @param recursive if set to true will recursively list files in subdirectories, * otherwise only files in the current directory will be listed - * @param pageSize maximum size of the page - * @param continuationToken If supplied will list files after this token, otherwise list from - * the beginning. You may acquire this token from the return of this method. - * @return A page of statuses of the files in the given path and the continuation token of this - * page. The continuation token will be null if the returned page is the last - * page. + * @return an {@link Iterator} over the statuses of the files in the given path */ - default Pair listFilesPaged( - Path path, boolean recursive, long pageSize, @Nullable String continuationToken) + default Iterator listFilesIterative(Path path, boolean recursive) throws IOException { - throw new UnsupportedOperationException(); + Queue files = new LinkedList<>(); + for (Queue toUnpack = new LinkedList<>(Collections.singletonList(path)); + !toUnpack.isEmpty(); ) { + FileStatus[] statuses = listStatus(toUnpack.remove()); + for (FileStatus f : statuses) { + if (!f.isDir()) { + files.add(f); + continue; + } + if (!recursive) { + continue; + } + toUnpack.add(f.getPath()); + } + } + return new Iterator() { + @Override + public boolean hasNext() { + return !files.isEmpty(); + } + + @Override + public FileStatus next() { + return files.remove(); + } + }; } /** From 954c5ff931f2e406ac4322643ddb4c3be629ac04 Mon Sep 17 00:00:00 2001 From: smdsbz Date: Wed, 15 Jan 2025 10:54:05 +0800 Subject: [PATCH 6/7] lazy iterator --- .../java/org/apache/paimon/fs/FileIO.java | 61 ++++++++++++------- .../apache/paimon/fs/FileStatusIterator.java | 49 +++++++++++++++ 2 files changed, 87 insertions(+), 23 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/fs/FileStatusIterator.java diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index 006276dbbb32..d1c29a14bf90 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -117,8 +117,11 @@ public interface FileIO extends Serializable { */ default FileStatus[] listFiles(Path path, boolean recursive) throws IOException { List files = new ArrayList<>(); - Iterator iter = listFilesIterative(path, recursive); - iter.forEachRemaining(files::add); + try (FileStatusIterator iter = listFilesIterative(path, recursive)) { + while (iter.hasNext()) { + files.add(iter.next()); + } + } return files.toArray(new FileStatus[0]); } @@ -128,35 +131,47 @@ default FileStatus[] listFiles(Path path, boolean recursive) throws IOException * @param path given path * @param recursive if set to true will recursively list files in subdirectories, * otherwise only files in the current directory will be listed - * @return an {@link Iterator} over the statuses of the files in the given path + * @return an {@link FileStatusIterator} over the statuses of the files in the given path */ - default Iterator listFilesIterative(Path path, boolean recursive) - throws IOException { - Queue files = new LinkedList<>(); - for (Queue toUnpack = new LinkedList<>(Collections.singletonList(path)); - !toUnpack.isEmpty(); ) { - FileStatus[] statuses = listStatus(toUnpack.remove()); - for (FileStatus f : statuses) { - if (!f.isDir()) { - files.add(f); - continue; - } - if (!recursive) { - continue; - } - toUnpack.add(f.getPath()); - } - } - return new Iterator() { + default FileStatusIterator listFilesIterative(Path path, boolean recursive) throws IOException { + return new FileStatusIterator() { + private Queue files = new LinkedList<>(); + private Queue subdirStack = new LinkedList<>(Collections.singletonList(path)); + @Override - public boolean hasNext() { + public boolean hasNext() throws IOException { + maybeUnpackSubdir(); return !files.isEmpty(); } @Override - public FileStatus next() { + public FileStatus next() throws IOException { + maybeUnpackSubdir(); return files.remove(); } + + private void maybeUnpackSubdir() throws IOException { + if (!files.isEmpty()) { + return; + } + if (subdirStack.isEmpty()) { + return; + } + FileStatus[] statuses = listStatus(subdirStack.remove()); + for (FileStatus f : statuses) { + if (!f.isDir()) { + files.add(f); + continue; + } + if (!recursive) { + continue; + } + subdirStack.add(f.getPath()); + } + } + + @Override + public void close() throws IOException {} }; } diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileStatusIterator.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileStatusIterator.java new file mode 100644 index 000000000000..e80c363a1ef0 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileStatusIterator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import java.io.Closeable; +import java.io.IOException; + +/** An iterator for lazily listing {@link FileStatus}. */ +public interface FileStatusIterator extends Closeable { + + /** + * Checks if there are more statuses to be iterated. + * + * @return whether there are more elements to be iterated + * @throws IOException - if failed to list statuses lazily + */ + boolean hasNext() throws IOException; + + /** + * Gets the next status to be iterated. + * + * @return the next status + * @throws IOException - if failed to list statuses lazily + */ + FileStatus next() throws IOException; + + /** + * Closes the iterator and its associated resources. + * + * @throws IOException - if failed to close the iterator + */ + void close() throws IOException; +} From 27349d89d77d14f150708dede226d801ab9ef36c Mon Sep 17 00:00:00 2001 From: smdsbz Date: Thu, 16 Jan 2025 11:56:59 +0800 Subject: [PATCH 7/7] templating iterator for fileio --- .../main/java/org/apache/paimon/fs/FileIO.java | 9 +++++---- ...leStatusIterator.java => RemoteIterator.java} | 16 ++++++++-------- 2 files changed, 13 insertions(+), 12 deletions(-) rename paimon-common/src/main/java/org/apache/paimon/fs/{FileStatusIterator.java => RemoteIterator.java} (74%) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index d1c29a14bf90..5ba16acfca0f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -117,7 +117,7 @@ public interface FileIO extends Serializable { */ default FileStatus[] listFiles(Path path, boolean recursive) throws IOException { List files = new ArrayList<>(); - try (FileStatusIterator iter = listFilesIterative(path, recursive)) { + try (RemoteIterator iter = listFilesIterative(path, recursive)) { while (iter.hasNext()) { files.add(iter.next()); } @@ -131,10 +131,11 @@ default FileStatus[] listFiles(Path path, boolean recursive) throws IOException * @param path given path * @param recursive if set to true will recursively list files in subdirectories, * otherwise only files in the current directory will be listed - * @return an {@link FileStatusIterator} over the statuses of the files in the given path + * @return an {@link RemoteIterator} over {@link FileStatus} of the files in the given path */ - default FileStatusIterator listFilesIterative(Path path, boolean recursive) throws IOException { - return new FileStatusIterator() { + default RemoteIterator listFilesIterative(Path path, boolean recursive) + throws IOException { + return new RemoteIterator() { private Queue files = new LinkedList<>(); private Queue subdirStack = new LinkedList<>(Collections.singletonList(path)); diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileStatusIterator.java b/paimon-common/src/main/java/org/apache/paimon/fs/RemoteIterator.java similarity index 74% rename from paimon-common/src/main/java/org/apache/paimon/fs/FileStatusIterator.java rename to paimon-common/src/main/java/org/apache/paimon/fs/RemoteIterator.java index e80c363a1ef0..c23de0c1993b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileStatusIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/RemoteIterator.java @@ -21,24 +21,24 @@ import java.io.Closeable; import java.io.IOException; -/** An iterator for lazily listing {@link FileStatus}. */ -public interface FileStatusIterator extends Closeable { +/** An iterator for lazily listing remote entries. */ +public interface RemoteIterator extends Closeable { /** - * Checks if there are more statuses to be iterated. + * Checks if there are more entries to be iterated. * * @return whether there are more elements to be iterated - * @throws IOException - if failed to list statuses lazily + * @throws IOException - if failed to list entries lazily */ boolean hasNext() throws IOException; /** - * Gets the next status to be iterated. + * Gets the next entry to be iterated. * - * @return the next status - * @throws IOException - if failed to list statuses lazily + * @return the next entry + * @throws IOException - if failed to list entries lazily */ - FileStatus next() throws IOException; + E next() throws IOException; /** * Closes the iterator and its associated resources.