Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pandas): add support for serializing pd.DataFrame in Arrow IPC formats #4779

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

judahrand
Copy link
Contributor

@judahrand judahrand commented Jun 6, 2024

What does this PR address?

This PR adds support for serializing Pandas DataFrames in both the Arrow IPC File and Streaming Formats. These formats are faster than Parquet from a serialization/deserialization perspective at the cost of a larger payload. This is a tradeoff which will depend on network bandwidth.

Additionally, it is worth noting that this PR uses the officially registered mime types for Arrow:
https://www.iana.org/assignments/media-types/application/vnd.apache.arrow.file
https://www.iana.org/assignments/media-types/application/vnd.apache.arrow.stream

It is a shame that BentoML doesn't use the correct mime type for Parquet - perhaps that is something to tackle in another PR:
https://www.iana.org/assignments/media-types/application/vnd.apache.parquet

In [1]: import pyarrow

In [2]: import pandas as pd

In [3]: import numpy as np

In [4]: import io

In [5]: df = pd.DataFrame({'a': np.arange(1000), 'b': np.random.random(1000)})

In [6]: def serialize_stream(obj):
    ...:     sink = pyarrow.BufferOutputStream()
    ...:     batch = pyarrow.RecordBatch.from_pandas(obj, preserve_index=True)
    ...:     with pyarrow.ipc.new_stream(sink, batch.schema) as writer:
    ...:         writer.write_batch(batch)
    ...:     return sink.getvalue().to_pybytes()
    ...: 
    ...: 
    ...: def deserialize_stream(obj):
    ...:     with pyarrow.ipc.open_stream(obj) as reader:
    ...:         return reader.read_pandas()
    ...: 
    ...: 
    ...: def serialize_file(obj):
    ...:     sink = pyarrow.BufferOutputStream()
    ...:     batch = pyarrow.RecordBatch.from_pandas(obj, preserve_index=True)
    ...:     with pyarrow.ipc.new_file(sink, batch.schema) as writer:
    ...:         writer.write_batch(batch)
    ...:     return sink.getvalue().to_pybytes()
    ...: 
    ...: def deserialize_file(obj):
    ...:     with pyarrow.ipc.open_file(obj) as reader:
    ...:         return reader.read_pandas()

In [7]: %timeit deserialize_file(serialize_file(df))
483 µs ± 9.55 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

In [8]: %timeit deserialize_stream(serialize_stream(df))
479 µs ± 4.89 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

In [9]: %timeit pd.read_parquet(io.BytesIO(df.to_parquet()))
861 µs ± 95.7 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

Fixes #(issue)

Before submitting:

@judahrand judahrand requested a review from a team as a code owner June 6, 2024 10:43
@judahrand judahrand requested review from larme and removed request for a team June 6, 2024 10:43
@judahrand
Copy link
Contributor Author

@larme I don't think these test failures are related to my changes

@frostming
Copy link
Contributor

@judahrand please resolve the conflicts

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants