Skip to content

Commit

Permalink
[common] A FileIO API to list files iteratively (#4834)
Browse files Browse the repository at this point in the history
  • Loading branch information
smdsbz authored Jan 17, 2025
1 parent 587fa28 commit b152608
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 5 deletions.
72 changes: 72 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -104,6 +107,75 @@ public interface FileIO extends Serializable {
*/
FileStatus[] listStatus(Path path) throws IOException;

/**
* List the statuses of the files in the given path if the path is a directory.
*
* @param path given path
* @param recursive if set to <code>true</code> will recursively list files in subdirectories,
* otherwise only files in the current directory will be listed
* @return the statuses of the files in the given path
*/
default FileStatus[] listFiles(Path path, boolean recursive) throws IOException {
List<FileStatus> files = new ArrayList<>();
try (RemoteIterator<FileStatus> iter = listFilesIterative(path, recursive)) {
while (iter.hasNext()) {
files.add(iter.next());
}
}
return files.toArray(new FileStatus[0]);
}

/**
* List the statuses of the files iteratively in the given path if the path is a directory.
*
* @param path given path
* @param recursive if set to <code>true</code> will recursively list files in subdirectories,
* otherwise only files in the current directory will be listed
* @return an {@link RemoteIterator} over {@link FileStatus} of the files in the given path
*/
default RemoteIterator<FileStatus> listFilesIterative(Path path, boolean recursive)
throws IOException {
return new RemoteIterator<FileStatus>() {
private Queue<FileStatus> files = new LinkedList<>();
private Queue<Path> subdirStack = new LinkedList<>(Collections.singletonList(path));

@Override
public boolean hasNext() throws IOException {
maybeUnpackSubdir();
return !files.isEmpty();
}

@Override
public FileStatus next() throws IOException {
maybeUnpackSubdir();
return files.remove();
}

private void maybeUnpackSubdir() throws IOException {
if (!files.isEmpty()) {
return;
}
if (subdirStack.isEmpty()) {
return;
}
FileStatus[] statuses = listStatus(subdirStack.remove());
for (FileStatus f : statuses) {
if (!f.isDir()) {
files.add(f);
continue;
}
if (!recursive) {
continue;
}
subdirStack.add(f.getPath());
}
}

@Override
public void close() throws IOException {}
};
}

/**
* List the statuses of the directories in the given path if the path is a directory.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fs;

import java.io.Closeable;
import java.io.IOException;

/** An iterator for lazily listing remote entries. */
public interface RemoteIterator<E> extends Closeable {

/**
* Checks if there are more entries to be iterated.
*
* @return whether there are more elements to be iterated
* @throws IOException - if failed to list entries lazily
*/
boolean hasNext() throws IOException;

/**
* Gets the next entry to be iterated.
*
* @return the next entry
* @throws IOException - if failed to list entries lazily
*/
E next() throws IOException;

/**
* Closes the iterator and its associated resources.
*
* @throws IOException - if failed to close the iterator
*/
void close() throws IOException;
}
44 changes: 39 additions & 5 deletions paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -134,7 +136,39 @@ public static void testOverwriteFileUtf8(Path file, FileIO fileIO) throws Interr
assertThat(exception.get()).isNull();
}

/** A {@link FileIO} on local filesystem to test the default copy implementation. */
@Test
public void testListFiles() throws Exception {
Path fileA = new Path(tempDir.resolve("a").toUri());
Path dirB = new Path(tempDir.resolve("b").toUri());
Path fileBC = new Path(tempDir.resolve("b/c").toUri());

FileIO fileIO = new LocalFileIO();
fileIO.writeFile(fileA, "fileA", false);
fileIO.mkdirs(dirB);
fileIO.writeFile(fileBC, "fileBC", false);

{
// if listing non-recursively, file "a" is the only file in the top level directory
FileStatus[] statuses = fileIO.listFiles(new Path(tempDir.toUri()), false);
assertThat(statuses.length).isEqualTo(1);
assertThat(statuses[0].getPath()).isEqualTo(fileA);
}

{
// if listing recursively, file "a" and "b/c" should be listed, directory "b" should be
// omitted
FileStatus[] statuses = fileIO.listFiles(new Path(tempDir.toUri()), true);
assertThat(statuses.length).isEqualTo(2);
statuses =
Arrays.stream(statuses)
.sorted(Comparator.comparing(FileStatus::getPath))
.toArray(FileStatus[]::new);
assertThat(statuses[0].getPath()).isEqualTo(fileA);
assertThat(statuses[1].getPath()).isEqualTo(fileBC);
}
}

/** A {@link FileIO} on local filesystem to test various default implementations. */
private static class DummyFileIO implements FileIO {
private static final ReentrantLock RENAME_LOCK = new ReentrantLock();

Expand Down Expand Up @@ -169,13 +203,13 @@ public PositionOutputStream newOutputStream(Path path, boolean overwrite)
}

@Override
public FileStatus getFileStatus(Path path) {
throw new UnsupportedOperationException();
public FileStatus getFileStatus(Path path) throws IOException {
return new LocalFileIO().getFileStatus(path);
}

@Override
public FileStatus[] listStatus(Path path) {
throw new UnsupportedOperationException();
public FileStatus[] listStatus(Path path) throws IOException {
return new LocalFileIO().listStatus(path);
}

@Override
Expand Down

0 comments on commit b152608

Please sign in to comment.