Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datalake: arrow_to_iobuf interface #23375

Merged

Conversation

jcipar
Copy link
Contributor

@jcipar jcipar commented Sep 18, 2024

This adds an arrow_to_iobuf interface that converts Arrow data to iobufs representing Parquet files that can be written to disk. There are two components:

  1. An implementation of arrow::io::OutputStream that collects data in iobufs
  2. A class that creates a parquet::io::FileWriter using that output stream and allows the caller to extract the generated iobufs.

This allows us to separate the compute side of generating parquet, which still occurs in the Arrow library, from the file io, which can now be made seastar-friednly.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v24.2.x
  • v24.1.x
  • v23.3.x

Release Notes

  • none

@@ -86,7 +29,7 @@ test_int: int32
test_long: int64
test_float: float
test_double: double
test_decimal: decimal128(8, 16)
test_decimal: decimal128(16, 8)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a bug in the previous versions of the test code. Precision must be greater than scale, but it was reversed before. It didn't matter earlier because we weren't validating that we could translate the data to Parquet, but now that we are, this needs to be correct.

@vbotbuildovich
Copy link
Collaborator

new failures in https://buildkite.com/redpanda/redpanda/builds/54698#0192071e-9aea-4b40-8a3f-cfa5af82c2fe:

"rptest.tests.delete_records_test.DeleteRecordsTest.test_delete_records_concurrent_truncations.cloud_storage_enabled=True.truncate_point=at_high_watermark"

#include <parquet/arrow/reader.h>
#include <parquet/type_fwd.h>

TEST(ParquetWriter, DoesNothing) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to rename this test after actually writing a test. fix coming...

//// METHODS SPECIFIC TO IOBUF OUTPUT STREAM ////
iobuf take_iobuf();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a comment that describes the method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed ;-)

Comment on lines -294 to +243
-- child 5 type: decimal128(8, 16)
-- child 5 type: decimal128(16, 8)
[
0.E-16,
0.E-16,
0.E-16,
0.E-16,
0.E-16
0.E-8,
0.E-8,
0.E-8,
0.E-8,
0.E-8
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this should be in a separate commit explaining the issue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@andrwng andrwng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly nits. I take it there will be an additional abstraction for IO in parquet_writer that isn't included here?

Comment on lines 34 to 29
void add_arrow_array(std::shared_ptr<arrow::Array> data);
iobuf take_iobuf();
iobuf close_and_take_iobuf();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you add some light documentation about what these are and their relationship with one another?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 19 to 20
#include <filesystem>
#include <utility>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably not needed in the header?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

Comment on lines 22 to 24
namespace arrow {
class Array;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add comment on the ending brace?

Also if we have this i'm wondering do we need to include arrow/io/memory.h? Or is that included for something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not totally sure what's going on. When I forward declare this and move #include <parquet/arrow/writer.h> to the cc file it fails on an incomplete type, but if I keep that include in the header it works. Same problem if I #include <parquet/type_fwd.h>

#include <stdexcept>

namespace datalake {
class iobuf_output_stream : public arrow::io::OutputStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe stick this in an anonymous namespace? Also could you move the implementation of the methods above the implementation of the arrow_to_iobuf methods? Just so it's easier to read this class together

Alternatively just inline everything in this definition, given the methods are all tiny

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put it in an anonymous namespace. I have a slight preference for keeping the methods out of the class definition, but if you have a strong opinion on it I can change that.

Copy link
Member

@dotnwat dotnwat Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the anonymous namespace is about avoiding external linkage where possible. i think andrew's point is that iobuf_output_stream is only used in this one translation unit.

preference for keeping the methods out of the class definition

not sure what this was referring to w.r.t. anonymous namespace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what this was referring to w.r.t. anonymous namespace?

It's not about anonymous namespaces. I interpreted the last line of Andrew's comment to be about putting the method definitions in the class definition instead of after the class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a slight preference for keeping the methods out of the class definition, but if you have a strong opinion on it I can change that.

Not a strong preference, but I typically inline if the code is small, just so it's easier on the eyes. Feel free to leave it as is; thanks for moving it!

}

void arrow_to_iobuf::add_arrow_array(std::shared_ptr<arrow::Array> data) {
arrow::ArrayVector data_av = {data};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{std::move(data)}?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 12 to 13
#include "datalake/data_writer_interface.h"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I might be missing something, what do we need this here for? Same in the cc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh probably that this isn't the final state of parquet_writer, since this isn't doing IO yet!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think you're right. I was thinking this would implement that interface, but the next PR is a higher level wrapper for this.

Also, that interface will have to change to make it futurized.

Comment on lines 68 to 70
auto vbegin = iobuf::byte_iterator(
full_result.cbegin(), full_result.cend());
auto vend = iobuf::byte_iterator(full_result.cend(), full_result.cend());
std::string full_result_string;
// Byte iterators don't work with the string constructor.
while (vbegin != vend) {
full_result_string += *vbegin;
++vbegin;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wondering if bytes_to_iobuf() from bytes/bytes.h works here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is converting an iobuf to bytes.


datalake::arrow_to_iobuf writer(*schema_translator.build_arrow_schema());

// The first write is a special case because it is 4 bytes longer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, is this explaining why we're using EXPECT_NEAR instead of EXPECT_EQ? Generally wondering where the 4 is showing up in this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I had previously been checking the exact value. When I was only doing 2 batches they were consistent, but when I switched to more batches I noticed that they are not.

Comment on lines 49 to 57
LIBRARIES
v::application
v::features
v::gtest_main
v::kafka_test_utils
v::datalake
v::model_test_utils
v::iceberg_test_utils
LABELS storage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably only need gtest, datalake, iceberg test utils? and the labels in this file are off

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

iobuf close_and_take_iobuf();

private:
std::shared_ptr<iobuf_output_stream> _outfile;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe _ostream? given this isn't a file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 66 to 70
// Check that the data is a valid parquet file. Convert the iobuf to a
// single buffer then import that into an arrow::io::BufferReader
auto vbegin = iobuf::byte_iterator(
full_result.cbegin(), full_result.cend());
auto vend = iobuf::byte_iterator(full_result.cend(), full_result.cend());
std::string full_result_string;
// Byte iterators don't work with the string constructor.
while (vbegin != vend) {
full_result_string += *vbegin;
++vbegin;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this work:

auto b = iobuf_to_bytes()
std::string(b.c_str(), b.size())

or add a helper for this case somewhere like bytes/string.h you can model it after iobuf_to_bytes in bytes.h.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better yet, the Arrow BufferReader can accept a pointer and length directly.

@jcipar jcipar force-pushed the jcipar/seastar-friendly-arrow-writer branch 3 times, most recently from 65203de to e87f74b Compare September 20, 2024 17:29

namespace datalake {

namespace {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mm, im not sure it makes sense to have an anonymous namespace in a header.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I think the code should work without wrapping the foward decl?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a clang-tidy warning for not using anonymous namespaces in headers, but can't find it with a quick search. Let's remove 👍

src/v/datalake/parquet_writer.h Show resolved Hide resolved
// virtual Status Write(const std::shared_ptr<Buffer>& data);

// Take the data from the iobuf and clear the internal state.
iobuf take_iobuf();
Copy link
Member

@dotnwat dotnwat Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this should be r-value qualified iobuf take_iobuf() && since _current_iobuf will be left in a moved-from state after this call.

EDIT: see later comment in test regarding this and writer reuse.

Comment on lines 44 to 48
datalake::arrow_to_iobuf writer(*schema_translator.build_arrow_schema());

for (int i = 0; i < 10; i++) {
writer.add_arrow_array(result);
iobuf serialized = writer.take_iobuf();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like there is a use-after-move issue here. when you call writer.take_iobuf that proxies to iobuf iobuf_output_stream::take_iobuf() { return std::move(_current_iobuf); }, but iobuf doesn't formally specify its moved-from-state. it works because it happens to leave it empty, but i think we should not depend on this. if you r-value qualified iobuf_output_stream::take_iobuf() per my other comment, then I think you'd have a cascade of changes which resulted in a use-after-move clang-tidy warning here.

i see two options. don't re-use the writer. this is the "cleanest" option, and if the writer is lightweight (it looks like it) then it probably makes the most sense.

the other option would be to not r-value qualify take_iobuf, and call it something like iobuf reset() which would move the current iobuf out of the ostream, and explicitly reset it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The writer includes a parquet::arrow::FileWriter which is stateful, so I don't think it would work create a new writer, but resetting the iobuf in take_iobuf should work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a copy of the iobuf so I could reset the _current_iobuf and return the copy. How can I return that by rvalue reference when it is allocated on the stack?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a copy of the iobuf so I could reset the _current_iobuf and return the copy. How can I return that by rvalue reference when it is allocated on the stack?

Does it work to do something like

iobuf take_iobuf() {
    iobuf b = std::move(_current_iobuf);
    _current_iobuf = {}; // or like _current_iobuf.clear() or somesuch
    return b;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iobuf take_iobuf() { return std::exchange(_current_iobuf, {}); }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to tyler and andrew neither of which make a copy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@andrwng andrwng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, pending changes from Noah's comments about iobufs

#include <stdexcept>

namespace datalake {
class iobuf_output_stream : public arrow::io::OutputStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a slight preference for keeping the methods out of the class definition, but if you have a strong opinion on it I can change that.

Not a strong preference, but I typically inline if the code is small, just so it's easier on the eyes. Feel free to leave it as is; thanks for moving it!


namespace datalake {

namespace {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I think the code should work without wrapping the foward decl?

Comment on lines 44 to 48
datalake::arrow_to_iobuf writer(*schema_translator.build_arrow_schema());

for (int i = 0; i < 10; i++) {
writer.add_arrow_array(result);
iobuf serialized = writer.take_iobuf();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a copy of the iobuf so I could reset the _current_iobuf and return the copy. How can I return that by rvalue reference when it is allocated on the stack?

Does it work to do something like

iobuf take_iobuf() {
    iobuf b = std::move(_current_iobuf);
    _current_iobuf = {}; // or like _current_iobuf.clear() or somesuch
    return b;
}

@jcipar jcipar force-pushed the jcipar/seastar-friendly-arrow-writer branch 2 times, most recently from 254961e to fe9d575 Compare September 24, 2024 13:54
Comment on lines +21 to +23
explicit arrow_to_iobuf(const arrow::Schema& schema);

void add_arrow_array(std::shared_ptr<arrow::Array> data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this array, how do we know what elements of the array map to specific schema elements in the Schema? Does an arrow Array have a pointer to it's schema element or is there some ID mapping?

For example, what happens if we reverse all the arrays before calling add_arrow_array?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The array contains a pointer to its data type, yes. This includes both the types and column names.

Copy link
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A question on the arrow interfaces. Looking good!

@jcipar jcipar force-pushed the jcipar/seastar-friendly-arrow-writer branch from fe9d575 to 80ec67b Compare September 24, 2024 20:42
The parameters to the decimal type were incorrect in the test code.
Precision must be greater than scale, but it was reversed before. It
didn't matter because we weren't validating that we could translate the
data to Parquet, but once we start translating data to Parquet, this
will generate an error.
@jcipar jcipar force-pushed the jcipar/seastar-friendly-arrow-writer branch from 80ec67b to 061e378 Compare September 24, 2024 21:13
This adds an arrow_to_iobuf interface that converts Arrow data to iobufs
representing Parquet files that can be written to disk. There are two
components:
1. An implementation of arrow::io::OutputStream that collects data in
iobufs
2. A class that creates a parquet::io::FileWriter using that output
stream and allows the caller to extract the generated iobufs.

This allows us to separate the compute side of generating parquet, which
still occurs in the Arrow library, from the file io, which can now be
made seastar-friednly.
@jcipar jcipar force-pushed the jcipar/seastar-friendly-arrow-writer branch from 061e378 to e943417 Compare September 25, 2024 15:19
iobuf close_and_take_iobuf();

private:
class iobuf_output_stream;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@andrwng andrwng merged commit 2c3fc7d into redpanda-data:dev Sep 26, 2024
17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants