Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: no longer load full table into ram in write #2265

Closed
wants to merge 24 commits into from

Conversation

aersam
Copy link
Contributor

@aersam aersam commented Mar 8, 2024

Description

Well, I suffered quite a bit and am still not finished. But here's what I've learned so far:

  • If you pass around the Iterator from Python to Rust, it's Send, but not Sync which complicates the usage in a future. To work around limitations of this, I had to split the WriterBuilder into two Structs, one for the data and one for the config
  • DataFusion is a pretty cool thing! You can really pick what you want from it and it uses good abstractions. I need to have a deeper look at it :)

About the implementation:

Instead of doing something proper, I wanted to first create this PR that basically just takes the iterable and breaks in into chunks to process. This is not ideal, it does not parallelize as good as using things like channels, but that would be a bigger thing. Still a big win for large tables!

Related Issue(s)

Fixes #2255

Documentation

@aersam aersam changed the title feat; no longer load full table into ram in write feat: no longer load full table into ram in write Mar 8, 2024
@github-actions github-actions bot added binding/python Issues for the Python package binding/rust Issues for the Rust crate labels Mar 8, 2024
Copy link

github-actions bot commented Mar 8, 2024

ACTION NEEDED

delta-rs follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

@ion-elgreco
Copy link
Collaborator

@aersam haven't checked yet, but are you streaming data to open file handles?

Or do you close the files after writing chunks?

@aersam
Copy link
Contributor Author

aersam commented Mar 8, 2024

I just create many files 🙂

@aersam
Copy link
Contributor Author

aersam commented Mar 8, 2024

Streaming would be better, but way more complicated to do, mostly since DataFusion's MemoryExec does not take a Stream but a Vec

@ion-elgreco
Copy link
Collaborator

@aersam in that case I don't think it's the right way, with small sized recordbatches you could potentially get many many small parquets which then suddenly would require to constantly run optimize afterwards to fix that

@aersam
Copy link
Contributor Author

aersam commented Mar 8, 2024

Ok, true. What if I'd count the bytes in a chunk and let it grow to a certain threshold?

@aersam
Copy link
Contributor Author

aersam commented Mar 11, 2024

@aersam in that case I don't think it's the right way, with small sized recordbatches you could potentially get many many small parquets which then suddenly would require to constantly run optimize afterwards to fix that

This is resolved now, it will now produce few big files using streams

@aersam aersam marked this pull request as ready for review March 11, 2024 15:41
@aersam
Copy link
Contributor Author

aersam commented Mar 11, 2024

I also removed lot's of duplicate code now, the writer was dividing stuff into partitions twice. Column Mapping in Write would also be pretty straightforward now

@aersam
Copy link
Contributor Author

aersam commented Mar 11, 2024

I could implement From for the WriteData Enum to make usage a bit simplier, if you want

@ion-elgreco
Copy link
Collaborator

@aersam I'll try to take a look tonight!

@ion-elgreco
Copy link
Collaborator

@aersam in that case I don't think it's the right way, with small sized recordbatches you could potentially get many many small parquets which then suddenly would require to constantly run optimize afterwards to fix that

This is resolved now, it will now produce few big files using streams

So instead of passing plans over, you are now passing RecordBatchStreams instead?

@aersam
Copy link
Contributor Author

aersam commented Mar 12, 2024

Yep. I tried using a StreamingTable, but this one has to be Sync which is an issue. The stream worked fine

@ion-elgreco
Copy link
Collaborator

I could implement From for the WriteData Enum to make usage a bit simplier, if you want

Maybe do that and have the DeltaOps().write use an into

@aersam
Copy link
Contributor Author

aersam commented Mar 12, 2024

I could implement From for the WriteData Enum to make usage a bit simplier, if you want

Maybe do that and have the DeltaOps().write use an into

this is done now

ion-elgreco
ion-elgreco previously approved these changes Mar 12, 2024
Copy link
Collaborator

@ion-elgreco ion-elgreco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work! @aersam

This should definitely be helpful in situations where you can send a recordbatchreader directly.

Before merging let's have @rtyler also take a short look if he has time :)

@aersam
Copy link
Contributor Author

aersam commented Mar 13, 2024

One thing came to my mind, I'm not a 100% sure, but is it a good idea to to py.allow_threads in lib.rs on Python Side? Since we're iterating accross language borders, I guess, we need to hold the GIL, no? I pushed a commit that should resolve it, but feel free to undo if you disagree

@ion-elgreco
Copy link
Collaborator

On this I'm not entirely sure, it is a python object and consuming a reader can only be done once but @emcake 's description of his PR suggests it's safe? #2091

@wjones127 @emcake any insights or comments on this?

@aersam
Copy link
Contributor Author

aersam commented Mar 13, 2024

On this I'm not entirely sure, it is a python object and consuming a reader can only be done once but @emcake 's description of his PR suggests it's safe? #2091

@wjones127 @emcake any insights or comments on this?

It think it can only be safe if pyarrow itself would take care of getting the GIL. I don't know if they do this

@ion-elgreco
Copy link
Collaborator

If that's the case, then we need to apply GilIterator change to merge_execute as well, since that one also wouldn't be safe anymore with your changes

@aersam
Copy link
Contributor Author

aersam commented Mar 13, 2024

If that's the case, then we need to apply GilIterator change to merge_execute as well, since that one also wouldn't be safe anymore with your changes

I tried getting myself through the pyarrow code and see if it does acquire the GIL and it looks like it does: https://github.com/apache/arrow/blob/93816475f75d751067d4ff427fb9ae64e85acebe/python/pyarrow/src/arrow/python/ipc.cc#L39

So I'll revert the last two commits

@aersam
Copy link
Contributor Author

aersam commented Mar 13, 2024

The failure is not related to my changes, I guess?

ion-elgreco
ion-elgreco previously approved these changes Mar 13, 2024
@aersam
Copy link
Contributor Author

aersam commented Mar 21, 2024

Closing this in favor of #2289 which I'll keep up to date with the main branch

@aersam aersam closed this Mar 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package binding/rust Issues for the Rust crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Do not load full source into RAM on write_to_deltalake
2 participants