GH-47628: [C++][Parquet] Implement basic parquet file rewriter#47775
GH-47628: [C++][Parquet] Implement basic parquet file rewriter#47775HuaHuaY wants to merge 1 commit intoapache:mainfrom
Conversation
|
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename the pull request title in the following format? or See also: |
e4de469 to
c216849
Compare
|
@pitrou @adamreeve @mapleFU Do you have any suggestions about this draft? Is there any efficient way to merge two parquet files' schema? |
mapleFU
left a comment
There was a problem hiding this comment.
Emm I'm thinking that just reuse the current code a ok way, since these logic in current impl would be a bit hacking with current interface...
wgtmac
left a comment
There was a problem hiding this comment.
I haven't reviewed all the changes yet and will progressively post my comments.
wgtmac
left a comment
There was a problem hiding this comment.
The general workflow of the rewriter looks good to me. However, I don't believe we should directly manipulate the thrift objects.
| RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, | ||
| std::shared_ptr<ArrowOutputStream> sink, | ||
| const RewriterProperties* props, | ||
| std::shared_ptr<RowGroupReader> row_group_reader, |
There was a problem hiding this comment.
Perhaps introduce a RowGroupContext to hold all row group xxx readers?
There was a problem hiding this comment.
I think it doesn't bring much benefit and requires one more step to unwrap of the wrapper class.
e037be7 to
253f281
Compare
b70917f to
439103e
Compare
439103e to
641ab8c
Compare
There was a problem hiding this comment.
Pull request overview
Draft implementation of a C++ Parquet “file rewriter” that can rewrite/concatenate/join Parquet files by copying encoded bytes and re-emitting metadata (optionally including page indexes and bloom filters), avoiding full decode/re-encode.
Changes:
- Add
ParquetFileRewriter+RewriterPropertiesand implement basic concat (horizontal) + join (vertical) rewriting. - Add metadata copying helpers (
to_thrift, new builder entrypoints, newToThriftoverloads) to support fast metadata reconstruction. - Add Arrow-based tests for roundtrip rewriting scenarios (simple, concat, join, concat+join) and wire into CMake.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| cpp/src/parquet/thrift_internal.h | Add ToThrift helpers for page-index-related structs. |
| cpp/src/parquet/properties.h | Make ReaderProperties::GetStream const; add RewriterProperties + default factory decl. |
| cpp/src/parquet/properties.cc | Update ReaderProperties::GetStream definition to const. |
| cpp/src/parquet/page_index.h | Extend PageIndexBuilder API to allow setting pre-built indexes. |
| cpp/src/parquet/page_index.cc | Implement new PageIndexBuilder setters and mixed builder/prebuilt serialization. |
| cpp/src/parquet/metadata.h | Add start_offset(), expose to_thrift(), extend RowGroupMetaDataBuilder. |
| cpp/src/parquet/metadata.cc | Implement new metadata accessors and new row-group column-chunk injection path. |
| cpp/src/parquet/file_rewriter.h | New public rewriter API (ParquetFileRewriter). |
| cpp/src/parquet/file_rewriter.cc | Core rewriting implementation (copy streams, concat/join logic, index/bloom handling). |
| cpp/src/parquet/bloom_filter_writer.h | Extend BloomFilterBuilder with InsertBloomFilter. |
| cpp/src/parquet/bloom_filter_writer.cc | Implement InsertBloomFilter. |
| cpp/src/parquet/arrow/test_util.h | Add helper to write a table into a Parquet buffer for tests. |
| cpp/src/parquet/arrow/arrow_rewriter_test.cc | Add Arrow-level rewriter roundtrip tests. |
| cpp/src/parquet/CMakeLists.txt | Add new source (file_rewriter.cc) and new test target. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (page_index_reader_ != nullptr && page_index_builder != nullptr) { | ||
| auto column_index = page_index_reader_->GetColumnIndex(column_ordinal_); | ||
| auto offset_index = page_index_reader_->GetOffsetIndex(column_ordinal_); | ||
| if (column_index != nullptr) { | ||
| page_index_builder->SetColumnIndex(column_ordinal_, column_index); | ||
| } | ||
| if (offset_index != nullptr) { | ||
| page_index_builder->SetOffsetIndex(column_ordinal_, offset_index, shift); | ||
| } | ||
| } | ||
|
|
||
| if (bloom_filter_reader_ != nullptr && bloom_filter_builder != nullptr) { | ||
| auto bloom_filter = bloom_filter_reader_->GetColumnBloomFilter(column_ordinal_); | ||
| if (bloom_filter != nullptr) { | ||
| bloom_filter_builder->InsertBloomFilter(column_ordinal_, | ||
| std::move(bloom_filter)); | ||
| } | ||
| } |
There was a problem hiding this comment.
The new rewriter code path copies page indexes (SetColumnIndex/SetOffsetIndex) and bloom filters (InsertBloomFilter) when enabled, but the added Arrow-level roundtrip tests only validate the decoded table contents. Consider extending tests to assert that rewritten files actually contain the expected page index / bloom filter structures (e.g., via ParquetFileReader::GetPageIndexReader / GetBloomFilterReader and checking non-null indexes/locations), so regressions in metadata copying/offset shifting are caught.
| std::shared_ptr<ArrowInputStream> GetStream(std::shared_ptr<ArrowInputFile> source, | ||
| int64_t start, int64_t num_bytes); | ||
| int64_t start, int64_t num_bytes) const; | ||
|
|
There was a problem hiding this comment.
Changing ReaderProperties::GetStream to be const changes the mangled symbol and will break ABI for downstream code compiled against earlier versions of the library. If ABI compatibility matters here, consider keeping the old non-const overload (forwarding to the const implementation) instead of changing the existing signature in-place.
| // Non-const overload kept for ABI compatibility. It forwards to the const | |
| // implementation introduced in a later version. | |
| std::shared_ptr<ArrowInputStream> GetStream(std::shared_ptr<ArrowInputFile> source, | |
| int64_t start, int64_t num_bytes) { | |
| return static_cast<const ReaderProperties*>(this)->GetStream(source, start, num_bytes); | |
| } |
| const void* to_thrift() const; | ||
|
|
There was a problem hiding this comment.
ColumnChunkMetaData::to_thrift() exposes an untyped pointer to an internal Thrift struct as part of the public metadata API, which couples consumers to internal representation and is easy to misuse/UB (wrong cast / lifetime assumptions). Prefer an internal-only accessor, or return a typed reference/pointer to the concrete thrift type in an internal header, or provide a dedicated cloning/copy helper on the builder to avoid exposing raw thrift at all.
7be60a0 to
41248c8
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
41248c8 to
2140284
Compare
|
|
2140284 to
648c87b
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
648c87b to
ec89836
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -482,6 +487,29 @@ void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) { | |||
| EXPECT_TRUE(result->Equals(*expected_array)); | |||
| } | |||
|
|
|||
There was a problem hiding this comment.
The WriteFile function takes buffer as a non-const reference parameter (std::shared_ptr<Buffer>&). This is unusual in modern C++; it would be more idiomatic to return the buffer via the return value or use an out parameter with clearer naming (e.g., out_buffer). However, since this is a test utility function, this is acceptable. Consider adding a comment to clarify that buffer is an output parameter.
| // Note: 'buffer' is an output parameter that will receive the serialized file contents. |
| class PARQUET_EXPORT ParquetFileRewriter { | ||
| public: | ||
| struct PARQUET_EXPORT Contents { | ||
| virtual ~Contents() = default; | ||
| virtual void Close() = 0; | ||
| virtual void Rewrite() = 0; | ||
| }; | ||
|
|
||
| ParquetFileRewriter(); | ||
| ~ParquetFileRewriter(); | ||
|
|
||
| static std::unique_ptr<ParquetFileRewriter> Open( | ||
| std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources, | ||
| std::shared_ptr<ArrowOutputStream> sink, | ||
| std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata, | ||
| std::shared_ptr<const ::arrow::KeyValueMetadata> sink_metadata = NULLPTR, | ||
| std::shared_ptr<RewriterProperties> props = default_rewriter_properties()); | ||
|
|
||
| void Open(std::unique_ptr<Contents> contents); | ||
| void Close(); | ||
|
|
||
| void Rewrite(); |
There was a problem hiding this comment.
The ParquetFileRewriter class and its public methods (Open, Close, Rewrite) lack documentation comments. Since this is a new public API (marked with PARQUET_EXPORT), these methods should have documentation describing their purpose, parameters, return values, and any exceptions that may be thrown. This is especially important for the Open method with its complex nested vector parameters.
| @@ -485,6 +488,7 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder { | |||
| ~RowGroupMetaDataBuilder(); | |||
|
|
|||
| ColumnChunkMetaDataBuilder* NextColumnChunk(); | |||
There was a problem hiding this comment.
The new overload of NextColumnChunk that takes ColumnChunkMetaData and a shift value lacks documentation. This method allows adding column metadata without creating a builder, which is a key feature for the rewriter's fast-copy optimization. Documentation should explain the parameters (especially shift for offset adjustment) and when to use this method versus the builder-based approach.
| ColumnChunkMetaDataBuilder* NextColumnChunk(); | |
| ColumnChunkMetaDataBuilder* NextColumnChunk(); | |
| // Add an existing column chunk metadata object to the row group without | |
| // constructing it through a ColumnChunkMetaDataBuilder. | |
| // | |
| // This overload is intended for fast-copy / rewriting scenarios where | |
| // column chunks are reused from another file and their metadata has | |
| // already been finalized. | |
| // | |
| // Parameters: | |
| // - cc_metadata: Ownership of the provided ColumnChunkMetaData is | |
| // transferred to the RowGroupMetaDataBuilder. | |
| // - shift: Byte offset delta to apply to all file-relative offsets in | |
| // cc_metadata (for example, when appending row groups at a different | |
| // position in the target file). Use 0 if no adjustment is needed. | |
| // | |
| // For column chunks produced by this writer, prefer the builder-based | |
| // NextColumnChunk() API above. |
| auto path = schema_desc.Column(i)->path()->ToDotString(); | ||
| if (auto [_, inserted] = column_paths.emplace(path); !inserted) { | ||
| // TODO(HuaHuaY): support choose one column from columns with same path | ||
| throw ParquetException("NotImplemented, files have same column path: ", path); |
There was a problem hiding this comment.
The error message "NotImplemented, files have same column path: " should be "NotImplemented: files have the same column path: " for better grammar. Also consider adding more context about which files have the duplicate column, if that information is available.
| throw ParquetException("NotImplemented, files have same column path: ", path); | |
| throw ParquetException("NotImplemented: files have the same column path: ", path); |
|
|
||
| void RowGroupMetaDataBuilder::NextColumnChunk( | ||
| std::unique_ptr<ColumnChunkMetaData> cc_metadata, int64_t shift) { | ||
| return impl_->NextColumnChunk(std::move(cc_metadata), shift); |
There was a problem hiding this comment.
The return statement is unnecessary. The method has a void return type, so the return statement on line 2055 should just be impl_->NextColumnChunk(std::move(cc_metadata), shift); without the return keyword.
| return impl_->NextColumnChunk(std::move(cc_metadata), shift); | |
| impl_->NextColumnChunk(std::move(cc_metadata), shift); |
| /// Serialize column index ordered by row group ordinal and then column ordinal. | ||
| result.column_index_locations = | ||
| SerializeIndex(column_index_builders_, column_indices_, sink); | ||
|
|
||
| // Serialize offset index ordered by row group ordinal and then column ordinal. | ||
| result.offset_index_locations = SerializeIndex(offset_index_builders_, sink); | ||
| /// Serialize offset index ordered by row group ordinal and then column ordinal. | ||
| result.offset_index_locations = | ||
| SerializeIndex(offset_index_builders_, offset_indices_, sink); |
There was a problem hiding this comment.
The comment style changed from // to /// (documentation comment). While this is technically correct since these are implementation details, verify if this change is intentional. In the existing codebase, regular comments (//) are typically used for implementation notes, while documentation comments (///) are used for API documentation.
| class PARQUET_EXPORT RewriterProperties { | ||
| public: | ||
| class Builder { | ||
| public: | ||
| Builder() | ||
| : pool_(::arrow::default_memory_pool()), | ||
| writer_properties_(default_writer_properties()), | ||
| reader_properties_(default_reader_properties()) {} | ||
|
|
||
| explicit Builder(const RewriterProperties& properties) | ||
| : pool_(properties.memory_pool()), | ||
| writer_properties_(properties.writer_properties()), | ||
| reader_properties_(properties.reader_properties()) {} | ||
|
|
||
| virtual ~Builder() = default; | ||
|
|
||
| /// Specify the memory pool for the rewriter. Default default_memory_pool. | ||
| Builder* memory_pool(MemoryPool* pool) { | ||
| pool_ = pool; | ||
| return this; | ||
| } | ||
|
|
||
| /// Set the writer properties. | ||
| Builder* writer_properties(std::shared_ptr<WriterProperties> properties) { | ||
| writer_properties_ = std::move(properties); | ||
| return this; | ||
| } | ||
|
|
||
| /// Set the reader properties. | ||
| Builder* reader_properties(ReaderProperties properties) { | ||
| reader_properties_ = std::move(properties); | ||
| return this; | ||
| } | ||
|
|
||
| /// Build the RewriterProperties with the builder parameters. | ||
| std::shared_ptr<RewriterProperties> build() { | ||
| return std::shared_ptr<RewriterProperties>(new RewriterProperties( | ||
| pool_, std::move(writer_properties_), std::move(reader_properties_))); | ||
| } | ||
|
|
||
| private: | ||
| MemoryPool* pool_; | ||
| std::shared_ptr<WriterProperties> writer_properties_; | ||
| ReaderProperties reader_properties_; | ||
| }; | ||
|
|
||
| MemoryPool* memory_pool() const { return pool_; } | ||
|
|
||
| const std::shared_ptr<WriterProperties>& writer_properties() const { | ||
| return writer_properties_; | ||
| } | ||
|
|
||
| const ReaderProperties& reader_properties() const { return reader_properties_; } | ||
|
|
||
| private: | ||
| explicit RewriterProperties(MemoryPool* pool, | ||
| std::shared_ptr<WriterProperties> writer_properties, | ||
| ReaderProperties reader_properties) | ||
| : pool_(pool), | ||
| writer_properties_(std::move(writer_properties)), | ||
| reader_properties_(std::move(reader_properties)) {} | ||
|
|
||
| MemoryPool* pool_; | ||
| std::shared_ptr<WriterProperties> writer_properties_; | ||
| ReaderProperties reader_properties_; | ||
| }; |
There was a problem hiding this comment.
The RewriterProperties class lacks documentation. As a new public API class (marked with PARQUET_EXPORT), it should have comprehensive documentation explaining its purpose, how it relates to WriterProperties and ReaderProperties, and what configuration options it provides for the rewriter.
|
|
||
| virtual void SetColumnIndex(int32_t i, | ||
| const std::shared_ptr<ColumnIndex>& column_index) = 0; | ||
|
|
There was a problem hiding this comment.
The new methods SetColumnIndex and SetOffsetIndex lack documentation comments. These are part of the public PageIndexBuilder interface and should include documentation explaining their purpose, parameters (especially the shift parameter in SetOffsetIndex), and when they should be used instead of the corresponding builder methods.
| virtual void SetColumnIndex(int32_t i, | |
| const std::shared_ptr<ColumnIndex>& column_index) = 0; | |
| /// \brief Set a fully constructed ColumnIndex for a column. | |
| /// | |
| /// This method can be used instead of GetColumnIndexBuilder() when the caller | |
| /// already has a materialized ColumnIndex instance (for example, when reusing | |
| /// or transforming an existing index) and does not need to build it through | |
| /// the ColumnIndexBuilder interface. | |
| /// | |
| /// \param i Column ordinal. | |
| /// \param column_index The ColumnIndex to associate with the given column. | |
| virtual void SetColumnIndex(int32_t i, | |
| const std::shared_ptr<ColumnIndex>& column_index) = 0; | |
| /// \brief Set a fully constructed OffsetIndex for a column. | |
| /// | |
| /// This method can be used instead of GetOffsetIndexBuilder() when the caller | |
| /// already has a materialized OffsetIndex instance. | |
| /// | |
| /// The \p shift parameter is applied to all page offsets contained in the | |
| /// provided OffsetIndex. It allows reusing an index whose offsets are relative | |
| /// to a different file position (for example, when concatenating data or | |
| /// writing the index at a different location) by shifting all stored offsets | |
| /// by a constant amount. | |
| /// | |
| /// \param i Column ordinal. | |
| /// \param offset_index The OffsetIndex to associate with the given column. | |
| /// \param shift A byte offset added to each page offset stored in \p offset_index. |
| PARQUET_THROW_NOT_OK(sink_->Write(kParquetMagic, 4)); | ||
| } else { | ||
| throw ParquetException( | ||
| "NotImplemented, rewriter does not support to write encrypted files."); |
There was a problem hiding this comment.
The error message "NotImplemented, rewriter does not support to write encrypted files." has a minor grammatical issue. It should be "NotImplemented: rewriter does not support writing encrypted files." or "NotImplemented, rewriter does not support encrypted file writing."
| "NotImplemented, rewriter does not support to write encrypted files."); | |
| "NotImplemented: rewriter does not support writing encrypted files."); |
This is a draft PR now. I follow Java's implementation but I think it is not a good enough design for C++. Because we must copy lots of code from file_writer.cc or file_reader.cc and it will be troublesome to maintain in the future. I prefer to implement some classes inheriting
XXXWriterorXXXReader. I'll think about how to refactor the code. If anyone has any good suggestions, please comment.Now I have written two kinds of tests. Test the horizontal splicing and vertical splicing of parquet files separately. But only horizontal splicing is implemented now because I don't find an efficient way to merge two parquet files' schema.
Rationale for this change
Allow to rewrite parquet files in binary data formats instead of reading, decoding all values and writing them.
What changes are included in this PR?
ParquetFileRewriterandRewriterProperties.to_thriftandSetXXXmethods to help me copy the metadata.CopyStreammethods to callmemcpybetweenArrowInputStreamandArrowOutputStream.RowGroupMetaDataBuilder::NextColumnChunk(std::unique_ptr<ColumnChunkMetaData> cc_metadata, int64_t shift)which allows to add column metadata without creatingColumnChunkMetaDataBuilder.Are these changes tested?
Yes
Are there any user-facing changes?
ReaderProperties::GetStreamis changed to a const method. Only the signature has been changed. Its original implementation allows it to be declared as a const method.