diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index fe2e31b8db7b..5ba16acfca0f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -39,12 +39,15 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Queue; import java.util.ServiceLoader; import java.util.Set; import java.util.stream.Collectors; @@ -104,6 +107,75 @@ public interface FileIO extends Serializable { */ FileStatus[] listStatus(Path path) throws IOException; + /** + * List the statuses of the files in the given path if the path is a directory. + * + * @param path given path + * @param recursive if set to true will recursively list files in subdirectories, + * otherwise only files in the current directory will be listed + * @return the statuses of the files in the given path + */ + default FileStatus[] listFiles(Path path, boolean recursive) throws IOException { + List files = new ArrayList<>(); + try (RemoteIterator iter = listFilesIterative(path, recursive)) { + while (iter.hasNext()) { + files.add(iter.next()); + } + } + return files.toArray(new FileStatus[0]); + } + + /** + * List the statuses of the files iteratively in the given path if the path is a directory. + * + * @param path given path + * @param recursive if set to true will recursively list files in subdirectories, + * otherwise only files in the current directory will be listed + * @return an {@link RemoteIterator} over {@link FileStatus} of the files in the given path + */ + default RemoteIterator listFilesIterative(Path path, boolean recursive) + throws IOException { + return new RemoteIterator() { + private Queue files = new LinkedList<>(); + private Queue subdirStack = new LinkedList<>(Collections.singletonList(path)); + + @Override + public boolean hasNext() throws IOException { + maybeUnpackSubdir(); + return !files.isEmpty(); + } + + @Override + public FileStatus next() throws IOException { + maybeUnpackSubdir(); + return files.remove(); + } + + private void maybeUnpackSubdir() throws IOException { + if (!files.isEmpty()) { + return; + } + if (subdirStack.isEmpty()) { + return; + } + FileStatus[] statuses = listStatus(subdirStack.remove()); + for (FileStatus f : statuses) { + if (!f.isDir()) { + files.add(f); + continue; + } + if (!recursive) { + continue; + } + subdirStack.add(f.getPath()); + } + } + + @Override + public void close() throws IOException {} + }; + } + /** * List the statuses of the directories in the given path if the path is a directory. * diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/RemoteIterator.java b/paimon-common/src/main/java/org/apache/paimon/fs/RemoteIterator.java new file mode 100644 index 000000000000..c23de0c1993b --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/RemoteIterator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import java.io.Closeable; +import java.io.IOException; + +/** An iterator for lazily listing remote entries. */ +public interface RemoteIterator extends Closeable { + + /** + * Checks if there are more entries to be iterated. + * + * @return whether there are more elements to be iterated + * @throws IOException - if failed to list entries lazily + */ + boolean hasNext() throws IOException; + + /** + * Gets the next entry to be iterated. + * + * @return the next entry + * @throws IOException - if failed to list entries lazily + */ + E next() throws IOException; + + /** + * Closes the iterator and its associated resources. + * + * @throws IOException - if failed to close the iterator + */ + void close() throws IOException; +} diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java index 8dbcf4185772..367bce383719 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java @@ -35,6 +35,8 @@ import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.StandardCopyOption; +import java.util.Arrays; +import java.util.Comparator; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; @@ -134,7 +136,39 @@ public static void testOverwriteFileUtf8(Path file, FileIO fileIO) throws Interr assertThat(exception.get()).isNull(); } - /** A {@link FileIO} on local filesystem to test the default copy implementation. */ + @Test + public void testListFiles() throws Exception { + Path fileA = new Path(tempDir.resolve("a").toUri()); + Path dirB = new Path(tempDir.resolve("b").toUri()); + Path fileBC = new Path(tempDir.resolve("b/c").toUri()); + + FileIO fileIO = new LocalFileIO(); + fileIO.writeFile(fileA, "fileA", false); + fileIO.mkdirs(dirB); + fileIO.writeFile(fileBC, "fileBC", false); + + { + // if listing non-recursively, file "a" is the only file in the top level directory + FileStatus[] statuses = fileIO.listFiles(new Path(tempDir.toUri()), false); + assertThat(statuses.length).isEqualTo(1); + assertThat(statuses[0].getPath()).isEqualTo(fileA); + } + + { + // if listing recursively, file "a" and "b/c" should be listed, directory "b" should be + // omitted + FileStatus[] statuses = fileIO.listFiles(new Path(tempDir.toUri()), true); + assertThat(statuses.length).isEqualTo(2); + statuses = + Arrays.stream(statuses) + .sorted(Comparator.comparing(FileStatus::getPath)) + .toArray(FileStatus[]::new); + assertThat(statuses[0].getPath()).isEqualTo(fileA); + assertThat(statuses[1].getPath()).isEqualTo(fileBC); + } + } + + /** A {@link FileIO} on local filesystem to test various default implementations. */ private static class DummyFileIO implements FileIO { private static final ReentrantLock RENAME_LOCK = new ReentrantLock(); @@ -169,13 +203,13 @@ public PositionOutputStream newOutputStream(Path path, boolean overwrite) } @Override - public FileStatus getFileStatus(Path path) { - throw new UnsupportedOperationException(); + public FileStatus getFileStatus(Path path) throws IOException { + return new LocalFileIO().getFileStatus(path); } @Override - public FileStatus[] listStatus(Path path) { - throw new UnsupportedOperationException(); + public FileStatus[] listStatus(Path path) throws IOException { + return new LocalFileIO().listStatus(path); } @Override