Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Relativize in-memory paths for data file and rewritable delete file locations #11525

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

amogh-jahagirdar
Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar commented Nov 12, 2024

This is a follow up to https://github.com/apache/iceberg/pull/11273/files#

Instead of broadcasting a map with absolute paths for data files and delete files to executors, we could shrink the memory footprint by relativizing the in-memory mapping, and then just prior to lookup on executors, reconstruct the absolute path as for the relevant delete files.

There are a few ways to go about relativization, in the current implementation I just did the simplest thing which was to relativize to the table location. There are more sophisticated things that could be done to save even more memory consumer from paths such as relativize according to the data file location (requires surfacing more details from LocationProvider), find the longest common prefix between all data/delete files in the rewritable deletes (requires a double pass over tasks, once to identify the longest common prefix via smallest/largest lexicographical strings, and then another to actually reconstruct the delete files). Patricia tries are another possibility though the serialized representation seems to take about the same amount of memory, not sure why that's the case.

I'm also working on identifying if using spark bytestobytes offheap map will save us even more memory but in the mean time thought it made sense to at least get this improvement in the interim. This is all internal, so we can always remove it down the line if something better comes along.

@github-actions github-actions bot added the spark label Nov 12, 2024
Comment on lines +527 to +528
return deleteLoader.loadPositionDeletes(
rewritableDeletes.deletesFor(path.toString(), specs), path);
Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll go back to explicitly returning a null index in case there are no deletes for the given path, the internal implementation of loader takes care of it regardless but it isn't obvious without reading into it.

private DeleteFile relativizeDeleteFile(
DeleteFile deleteFile, Map<Integer, PartitionSpec> specs) {
return FileMetadata.deleteFileBuilder(specs.get(deleteFile.specId()))
.copy(deleteFile)
Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd have a slight perf hit by having to do this copy (particularly the copying the partition data in memory). I'll see if there's any relevant benchmarking we can leverage to measure if it's really significant or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there's a way to extend planning APIs so that we return all paths already relativized but that seems like it would be a bigger change, and it's not obvious we should do that until there's evidence that this copy is expensive.

@amogh-jahagirdar
Copy link
Contributor Author

cc @singhpk234

@RussellSpitzer
Copy link
Member

Just as a gut comment, if we just compressed them shouldn't we get almost all the benefits we are looking for? They are just a bunch of strings so the binary representation of all of them should be pretty compressible.

@amogh-jahagirdar
Copy link
Contributor Author

amogh-jahagirdar commented Nov 13, 2024

Just as a gut comment, if we just compressed them shouldn't we get almost all the benefits we are looking for? They are just a bunch of strings so the binary representation of all of them should be pretty compressible.

It's true the broadcast would be compressed by default via spark.broadcast.compress and that would minimize the space pretty well. I think the concern is more so when we need to load the map broadcast variable on executor side we'd ultimately need to decompress all the chunks of the map. So the goal for relativization was to minimize how much would take in the in-memory representation of the map after decompression. Let me know if that makes sense.

@RussellSpitzer
Copy link
Member

It's true the broadcast would be compressed by default via spark.broadcast.compress and that would minimize the space pretty well. I think the concern is more so when we need to load the map broadcast variable on executor side we'd ultimately need to decompress all the chunks of the map. So the goal for relativization was to minimize how much would take in the in-memory representation of the map after decompression. Let me know if that makes sense.|

Won't it all be in memory anyway? Java should do string interning with prefix?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants