Skip to content

Commit

Permalink
lazy iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
smdsbz committed Jan 15, 2025
1 parent ab81b55 commit 5cbd2c5
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 23 deletions.
61 changes: 38 additions & 23 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ public interface FileIO extends Serializable {
*/
default FileStatus[] listFiles(Path path, boolean recursive) throws IOException {
List<FileStatus> files = new ArrayList<>();
Iterator<FileStatus> iter = listFilesIterative(path, recursive);
iter.forEachRemaining(files::add);
try (FileStatusIterator iter = listFilesIterative(path, recursive)) {
while (iter.hasNext()) {
files.add(iter.next());
}
}
return files.toArray(new FileStatus[0]);
}

Expand All @@ -128,35 +131,47 @@ default FileStatus[] listFiles(Path path, boolean recursive) throws IOException
* @param path given path
* @param recursive if set to <code>true</code> will recursively list files in subdirectories,
* otherwise only files in the current directory will be listed
* @return an {@link Iterator} over the statuses of the files in the given path
* @return an {@link FileStatusIterator} over the statuses of the files in the given path
*/
default Iterator<FileStatus> listFilesIterative(Path path, boolean recursive)
throws IOException {
Queue<FileStatus> files = new LinkedList<>();
for (Queue<Path> toUnpack = new LinkedList<>(Collections.singletonList(path));
!toUnpack.isEmpty(); ) {
FileStatus[] statuses = listStatus(toUnpack.remove());
for (FileStatus f : statuses) {
if (!f.isDir()) {
files.add(f);
continue;
}
if (!recursive) {
continue;
}
toUnpack.add(f.getPath());
}
}
return new Iterator<FileStatus>() {
default FileStatusIterator listFilesIterative(Path path, boolean recursive) throws IOException {
return new FileStatusIterator() {
private Queue<FileStatus> files = new LinkedList<>();
private Queue<Path> subdirStack = new LinkedList<>(Collections.singletonList(path));

@Override
public boolean hasNext() {
public boolean hasNext() throws IOException {
maybeUnpackSubdir();
return !files.isEmpty();
}

@Override
public FileStatus next() {
public FileStatus next() throws IOException {
maybeUnpackSubdir();
return files.remove();
}

private void maybeUnpackSubdir() throws IOException {
if (!files.isEmpty()) {
return;
}
if (subdirStack.isEmpty()) {
return;
}
FileStatus[] statuses = listStatus(subdirStack.remove());
for (FileStatus f : statuses) {
if (!f.isDir()) {
files.add(f);
continue;
}
if (!recursive) {
continue;
}
subdirStack.add(f.getPath());
}
}

@Override
public void close() throws IOException {}
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fs;

import java.io.Closeable;
import java.io.IOException;

/** An iterator for lazily listing {@link FileStatus}. */
public interface FileStatusIterator extends Closeable {

/**
* Checks if there are more statuses to be iterated.
*
* @return whether there are more elements to be iterated
* @throws IOException - if failed to list statuses lazily
*/
boolean hasNext() throws IOException;

/**
* Gets the next status to be iterated.
*
* @return the next status
* @throws IOException - if failed to list statuses lazily
*/
FileStatus next() throws IOException;

/**
* Closes the iterator and its associated resources.
*
* @throws IOException - if failed to close the iterator
*/
void close() throws IOException;
}

0 comments on commit 5cbd2c5

Please sign in to comment.