Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Connect: Fix a bug in streams closing while read or write metadata files #11609

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions core/src/main/java/org/apache/iceberg/TableMetadataParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,25 @@ public static void write(TableMetadata metadata, OutputFile outputFile) {
public static void internalWrite(
TableMetadata metadata, OutputFile outputFile, boolean overwrite) {
boolean isGzip = Codec.fromFileName(outputFile.location()) == Codec.GZIP;
try (OutputStream os = overwrite ? outputFile.createOrOverwrite() : outputFile.create();
OutputStream gos = isGzip ? new GZIPOutputStream(os) : os;
OutputStream os = overwrite ? outputFile.createOrOverwrite() : outputFile.create();
// if isGzip is true, os will be closed by GZIPOutputStream,
// otherwise, os will be closed by try-with-resources
try (OutputStream gos = isGzip ? new GZIPOutputStream(os) : os;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not gathering all in the try-on-resource ? Something like:

try (OutputStream gos = isGzip ? new GZIPOutputStream((overwrite ? output.createOrOverwrite() : outputFile.create())) : (overwrite ? output.createOrOverwrite() : outputFile.create())) { 

Comment on lines +125 to +128
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to have some sort of internal output stream implementation which delegates to the gzip or underlying oputput stream, and handles the complexity of closing in case of failures in GzipOutputStream constructor, and propogating that exception. Then the code below really just would have the delegate in the try with closeable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to that idea. We should also use this for ViewMetadataParser

OutputStreamWriter writer = new OutputStreamWriter(gos, StandardCharsets.UTF_8)) {
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
generator.useDefaultPrettyPrinter();
toJson(metadata, generator);
generator.flush();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to write json to file: %s", outputFile);
} finally {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to reproduce the root cause of this in TableMetadataParserTest so that we can be sure this is properly fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additionally, whatever fix we do, we should also apply to ViewMetadataParser

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to reproduce this in a unit test

try {
// in case of an exception in GZIPOutputStream constructor,
// the stream is not closed and needs to be closed here
Comment on lines +138 to +139
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Explicitly close the stream in case of exceptions thrown by GZIPOutputStream constructor. See #11220 for more details"

os.close();
} catch (IOException ignored) {
// ignore
}
}
}

Expand Down Expand Up @@ -277,11 +287,21 @@ public static TableMetadata read(FileIO io, String path) {

public static TableMetadata read(FileIO io, InputFile file) {
Codec codec = Codec.fromFileName(file.location());
try (InputStream is = file.newStream();
InputStream gis = codec == Codec.GZIP ? new GZIPInputStream(is) : is) {
InputStream is = file.newStream();
// if codec is GZIP, is will be closed by GZIPInputStream
// otherwise, os will be closed by try-with-resources
try (InputStream gis = codec == Codec.GZIP ? new GZIPInputStream(is) : is) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not gathering both statements in one try close ? It would be easier.

Something like:

try (InputStream is = (codec == Codec.GZIP) ? new GzipInputStream(file.newStream()) : file.newStream()) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the original code, but there is an edge case where the GzipInputStream throws an exception in its constructor, and it does not close the InputStream passed as an argument in this case, which according to the PR that introduced the bug could increase memory usage and lead to unexpected results.

return fromJson(file, JsonUtil.mapper().readValue(gis, JsonNode.class));
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to read file: %s", file);
} finally {
try {
// in case of an exception in GZIPInputStream constructor,
// the stream is not closed and needs to be closed here
is.close();
} catch (IOException ignored) {
// ignore
}
}
}

Expand Down