-
Notifications
You must be signed in to change notification settings - Fork 1.5k
GH-3254: Write footer together by buffering before append #3269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
cc @shangxinli @wgtmac could you pls take a look thanks! |
final long footerStart = out.getPos(); | ||
|
||
// Build the footer metadata) in memory using the helper stream | ||
InMemoryPositionOutputStream buffer = new InMemoryPositionOutputStream(footerStart); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think adding yet another buffered stream help anything here. The output stream implementation should already has its internal buffer and eventually it will flush data to the underlying storage which means network or filesystem error may still happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes definitely, the buffered stream would help consolidating the write and push to the disk in 1 attempt. Previously the writes in footer were distributed across serializeColumnIndexes, serializeOffsetIndexes and serializeBloomFilters. The buffer allows aggregating all of the above and writes to disk at the end once the heavy computation is done. The network corruption can still happen but changes will significantly reduce and can only occur when the final buffered stream is committed to disk thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. I believe many OutputStream implementations (especially FSDataOutputStream) should have already implemented their internal buffers. Perhaps @steveloughran may correct me if I was wrong.
Even if we want to minimize the IO count, this is the wrong place since it only contains the footer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reply! Maybe there is a gap in my understanding. The goal of this change was to reduce the likelihood of partial data written to disk during the footer write.
Currently column index, offset, bloom filter and footer is serialized and bytes are sent to stream in stages (serialize + write, serialize + write .... etc). any network failure in the serialization process can corrupt data (since previous stage may have sent the bytes to disk if the internal stream buffer got full, the exposure window is large).
With the new change the serialized outputs are built in memory against a virtual position, then emitting once (serialize + serialize + serialize + write). This minimizes the risk of partial data writen, since data is held back until all serialization is complete. The underlying concrete stream may have varying sizes of in memory buffer and may decide to flush to disk at anytime, but avoided in the latter code, since the write is done once at the end.
Completely agree this will not make it atomic, and hard to eliminate this kind of problem in the way current code is structured, but this should reduce the likelyhood of corrupt data during the footer write. This change is only meant to optmize the footer.
If I am missing something about the internal stream that negates the gap removal, happy to revise and iterate 😀, Appreciate the review and guidance 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal of this change was to reduce the likelihood of partial data written to disk during the footer write
No, the original issue was proposed to deal with the error state after a failed end
call. Concrete output stream implementations should already use an internal buffer as you did in this PR (or applications are also easy to wrap the stream with a buffered output stream by themselves) so it is not our responsibility to over-optimize it.
I think the problem is to figure out how downstream projects deal with a failed ParquetFileWriter.end()
. IMHO, they should simply call close
instead of end
again in case of a failed call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, I think there is some confusion about what this PR is fixing, I added more details in the PR. To over-simplify, it's simply moving the inter-leaved writes to 1 single write at the end.
Today we interleave serialization and writes, so if an exception occurs between those phases, prior writes may already have been flushed by the FS client, leaving a truncated write (The concrete outputstream internal buffer can flush depending on the previous serialization size and the configurations). By serializing fully in memory first, we always eliminate writes during serialization. This does not make the write atomic, we may need future effort for this.
Maybe we are discussing different concerns/issues, just wanted to know what you think @wgtmac?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @wgtmac that this doesn't solve the problem, and also introduces other issues by introducing another buffer. Since writes to object stores are pretty expensive, those libraries often pre-allocate a rather big buffer. Adding another buffer as suggested in the PR will introduce more memory pressure, which I think should be avoided. This problem will amplify if you write to a lot of files in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reviews @Fokko @wgtmac, happy to abandon this PR, I agree the memory pressure would increase. Would you suggest any alternate way to mitigate this. I see one of the possibilities to snapshot the write to a temporary directory and do an atomic rename (which would come at the expense of more intermediate storage requirements, possibly 2x in the worst case). Is there anything already discussed about it previously? thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These solutions are all sound like workaround for avoiding failures but actually those failures may eventually emerge. I think what the downstream projects need is to know the writer has failed and gracefully handle the error (not to get yet another exception).
try { | ||
out.write(footerBytes); | ||
out.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing the state after flushing may make the case even worse. For example if the 1st call to end has flushed some data to the underlying storage and exception happens in the halfway. Then a retry issues the 2nd call which may succeed eventually so dirty data may have been written to the file.
IMHO, the behavior before this change is much safer though the exception message is a little bit confusing. It seems to be the issue from the Iceberg parquet writer (or any other writer impl on top of the ParquetFileWriter) which tries to retry from endRowGroup after a failed end call. It should actually call close if anything wrong happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's a good point, the state change can be done before flushing the buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have updated the state change to occur before the stream flush. This preserves the current behaviour and reduces the chances of corruption since the commit is done at the end of computation
out
in 1 call.ENDED
only after write and flush succeed.