Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Connect: Fix a bug in streams closing while read or write metadata files #11609

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

hussein-awala
Copy link
Member

closes: #11582

#11220 introduced a new bug instead of fixing the original issue, where it fixed the case where there is an exception in the GZIPOutputStream constructor, but if there is not, the stream will be closed when try-with-resources tries to close it and it will throw an exception.

This PR:

InputStream is = file.newStream();
// if codec is GZIP, is will be closed by GZIPInputStream
// otherwise, os will be closed by try-with-resources
try (InputStream gis = codec == Codec.GZIP ? new GZIPInputStream(is) : is) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not gathering both statements in one try close ? It would be easier.

Something like:

try (InputStream is = (codec == Codec.GZIP) ? new GzipInputStream(file.newStream()) : file.newStream()) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the original code, but there is an edge case where the GzipInputStream throws an exception in its constructor, and it does not close the InputStream passed as an argument in this case, which according to the PR that introduced the bug could increase memory usage and lead to unexpected results.

OutputStream os = overwrite ? outputFile.createOrOverwrite() : outputFile.create();
// if isGzip is true, os will be closed by GZIPOutputStream,
// otherwise, os will be closed by try-with-resources
try (OutputStream gos = isGzip ? new GZIPOutputStream(os) : os;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not gathering all in the try-on-resource ? Something like:

try (OutputStream gos = isGzip ? new GZIPOutputStream((overwrite ? output.createOrOverwrite() : outputFile.create())) : (overwrite ? output.createOrOverwrite() : outputFile.create())) { 

Comment on lines +138 to +139
// in case of an exception in GZIPOutputStream constructor,
// the stream is not closed and needs to be closed here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Explicitly close the stream in case of exceptions thrown by GZIPOutputStream constructor. See #11220 for more details"

Comment on lines +125 to +128
OutputStream os = overwrite ? outputFile.createOrOverwrite() : outputFile.create();
// if isGzip is true, os will be closed by GZIPOutputStream,
// otherwise, os will be closed by try-with-resources
try (OutputStream gos = isGzip ? new GZIPOutputStream(os) : os;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to have some sort of internal output stream implementation which delegates to the gzip or underlying oputput stream, and handles the complexity of closing in case of failures in GzipOutputStream constructor, and propogating that exception. Then the code below really just would have the delegate in the try with closeable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to that idea. We should also use this for ViewMetadataParser

OutputStreamWriter writer = new OutputStreamWriter(gos, StandardCharsets.UTF_8)) {
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
generator.useDefaultPrettyPrinter();
toJson(metadata, generator);
generator.flush();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to write json to file: %s", outputFile);
} finally {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to reproduce the root cause of this in TableMetadataParserTest so that we can be sure this is properly fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additionally, whatever fix we do, we should also apply to ViewMetadataParser

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to reproduce this in a unit test

@bryanck
Copy link
Contributor

bryanck commented Nov 21, 2024

+1 to reverting the original change

@bryanck bryanck added this to the Iceberg 1.7.1 milestone Nov 21, 2024
@hussein-awala
Copy link
Member Author

+1 to reverting the original change

Yes, but there is indeed an edge case that leads to failure, and this fix is ​​a revert + fix for the original problem. Manually closing the file stream that can be closed automatically is ugly, I agree, but sometimes we have to do it.

I'm open to any suggestion

@bryanck
Copy link
Contributor

bryanck commented Nov 21, 2024

For 1.7.1 I'd prefer to revert it, and follow up with a separate PR with a fix, but I'm OK either way.

@hussein-awala
Copy link
Member Author

For 1.7.1 I'd prefer to revert it, and follow up with a separate PR with a fix

Sounds good, I opened #11621 to revert the commit.

@bryanck bryanck removed this from the Iceberg 1.7.1 milestone Nov 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Iceberg 1.7.0 java.lang.IllegalStateException: Connection pool shut down
5 participants