From ae15c3f4735e5fdaabc19a71824493036fa0bfa3 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Mon, 13 Jan 2025 11:04:41 +0800 Subject: [PATCH] WIP: [C++][Parquet] Add RowRanges API --- cpp/src/parquet/CMakeLists.txt | 1 + cpp/src/parquet/row_range.cc | 225 +++++++++++++++++++++++++++++++++ cpp/src/parquet/row_range.h | 87 +++++++++++++ 3 files changed, 313 insertions(+) create mode 100644 cpp/src/parquet/row_range.cc create mode 100644 cpp/src/parquet/row_range.h diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 83eb522484ba0..e140071349110 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -180,6 +180,7 @@ set(PARQUET_SRCS platform.cc printer.cc properties.cc + row_range.cc schema.cc size_statistics.cc statistics.cc diff --git a/cpp/src/parquet/row_range.cc b/cpp/src/parquet/row_range.cc new file mode 100644 index 0000000000000..5a950d542378b --- /dev/null +++ b/cpp/src/parquet/row_range.cc @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include "arrow/util/bitmap_ops.h" +#include "arrow/util/unreachable.h" +#include "parquet/exception.h" + +namespace parquet { + +class IteratorImpl : public RowRanges::Iterator { + public: + explicit IteratorImpl(const RowRanges& ranges) + : iter_(ranges.ranges_.cbegin()), end_(ranges.ranges_.cend()) {} + + ~IteratorImpl() override = default; + + std::variant + NextRange() override { + if (iter_ == end_) { + return RowRanges::End(); + } + if (std::holds_alternative(*iter_)) { + return std::get(*iter_); + } + if (std::holds_alternative(*iter_)) { + return std::get(*iter_); + } + arrow::Unreachable("Invalid row ranges type"); + } + + private: + decltype(RowRanges::ranges_.cbegin()) iter_; + decltype(RowRanges::ranges_.cend()) end_; +}; + +std::unique_ptr RowRanges::NewIterator() const { + return std::make_unique(*this); +} + +void RowRanges::Validate() const { + int64_t last_end = -1; + for (const auto& range : ranges_) { + if (std::holds_alternative(range)) { + const auto& interval = std::get(range); + if (interval.start <= last_end) { + throw ParquetException("Row ranges are not in ascending order"); + } + if (interval.end < interval.start) { + throw ParquetException("Invalid interval range"); + } + last_end = interval.end; + continue; + } + if (std::holds_alternative(range)) { + const auto& bitmap = std::get(range); + if (bitmap.offset <= last_end) { + throw ParquetException("Row ranges are not in ascending order"); + } + last_end = bitmap.offset + sizeof(bitmap.bitmap) - 1; + continue; + } + arrow::Unreachable("Invalid row ranges type"); + } +} + +int64_t RowRanges::row_count() const { + int64_t count = 0; + for (const auto& range : ranges_) { + if (std::holds_alternative(range)) { + const auto& interval = std::get(range); + count += interval.end - interval.start + 1; + } + if (std::holds_alternative(range)) { + const auto& bitmap = std::get(range); + count += arrow::internal::CountSetBits( + reinterpret_cast(&bitmap.bitmap), 0, sizeof(bitmap.bitmap)); + } + arrow::Unreachable("Invalid row ranges type"); + } + return count; +} + +RowRanges RowRanges::Intersect(const RowRanges& lhs, const RowRanges& rhs) { + RowRanges result; + auto lhs_iter = lhs.NewIterator(); + auto rhs_iter = rhs.NewIterator(); + auto lhs_range = lhs_iter->NextRange(); + auto rhs_range = rhs_iter->NextRange(); + + while (!std::holds_alternative(lhs_range) && + !std::holds_alternative(rhs_range)) { + if (!std::holds_alternative(lhs_range) || + !std::holds_alternative(rhs_range)) { + throw ParquetException("Bitmap range is not yet supported"); + } + + auto& left = std::get(lhs_range); + auto& right = std::get(rhs_range); + + // Find overlapping region + int64_t start = std::max(left.start, right.start); + int64_t end = std::min(left.end, right.end); + + // If there is an overlap, add it to results + if (start <= end) { + result.ranges_.push_back(IntervalRange{start, end}); + } + + // Advance the iterator with smaller end + if (left.end < right.end) { + lhs_range = lhs_iter->NextRange(); + } else { + rhs_range = rhs_iter->NextRange(); + } + } + + return result; +} + +RowRanges RowRanges::Union(const RowRanges& lhs, const RowRanges& rhs) { + RowRanges result; + auto lhs_iter = lhs.NewIterator(); + auto rhs_iter = rhs.NewIterator(); + auto lhs_range = lhs_iter->NextRange(); + auto rhs_range = rhs_iter->NextRange(); + + if (std::holds_alternative(lhs_range)) { + return rhs; + } + if (std::holds_alternative(rhs_range)) { + return lhs; + } + + if (std::holds_alternative(lhs_range)) { + throw ParquetException("Bitmap range is not yet supported"); + } + IntervalRange current = std::get(lhs_range); + lhs_range = lhs_iter->NextRange(); + + while (!std::holds_alternative(lhs_range) || + !std::holds_alternative(rhs_range)) { + IntervalRange next; + + if (std::holds_alternative(rhs_range)) { + // Only lhs ranges remain + if (std::holds_alternative(lhs_range)) { + throw ParquetException("Bitmap range is not yet supported"); + } + next = std::get(lhs_range); + lhs_range = lhs_iter->NextRange(); + } else if (std::holds_alternative(lhs_range)) { + // Only rhs ranges remain + if (std::holds_alternative(rhs_range)) { + throw ParquetException("Bitmap range is not yet supported"); + } + next = std::get(rhs_range); + rhs_range = rhs_iter->NextRange(); + } else { + // Both iterators have ranges - pick the one with smaller start + if (std::holds_alternative(lhs_range) || + std::holds_alternative(rhs_range)) { + throw ParquetException("Bitmap range is not yet supported"); + } + const auto& left = std::get(lhs_range); + const auto& right = std::get(rhs_range); + + if (left.start <= right.start) { + next = left; + lhs_range = lhs_iter->NextRange(); + } else { + next = right; + rhs_range = rhs_iter->NextRange(); + } + } + + if (current.end + 1 >= next.start) { + // Concatenate overlapping or adjacent ranges + current.end = std::max(current.end, next.end); + } else { + // Gap between current and next range + result.ranges_.push_back(current); + current = next; + } + } + + result.ranges_.push_back(current); + return result; +} + +RowRanges RowRanges::MakeSingle(int64_t row_count) { + RowRanges rowRanges; + rowRanges.ranges_.push_back(IntervalRange{0, row_count - 1}); + return rowRanges; +} + +RowRanges RowRanges::MakeSingle(int64_t start, int64_t end) { + RowRanges rowRanges; + rowRanges.ranges_.push_back(IntervalRange{start, end}); + return rowRanges; +} + +RowRanges RowRanges::MakeIntervals(const std::vector& intervals) { + RowRanges rowRanges; + rowRanges.ranges_.reserve(intervals.size()); + rowRanges.ranges_.insert(rowRanges.ranges_.end(), intervals.cbegin(), intervals.cend()); + return rowRanges; +} + +} // namespace parquet \ No newline at end of file diff --git a/cpp/src/parquet/row_range.h b/cpp/src/parquet/row_range.h new file mode 100644 index 0000000000000..a1de68ea251df --- /dev/null +++ b/cpp/src/parquet/row_range.h @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "parquet/platform.h" + +namespace parquet { + +/// RowRanges is a collection of non-overlapping and ascendingly ordered row ranges. +class PARQUET_EXPORT RowRanges { + public: + /// \brief A range of contiguous rows represented by an interval. + struct IntervalRange { + /// Start row of the range (inclusive). + int64_t start; + /// End row of the range (inclusive). + int64_t end; + }; + + /// \brief A range of contiguous rows represented by a bitmap. + struct BitmapRange { + /// Start row of the range (inclusive). + int64_t offset; + /// Zero appended if there are less than 64 elements. + uint64_t bitmap; + }; + + /// \brief An end marker for the row range iterator. + struct End {}; + + /// \brief An iterator for accessing row ranges in order. + class Iterator { + public: + virtual ~Iterator() = default; + virtual std::variant NextRange() = 0; + }; + + /// \brief Create a new iterator for accessing row ranges in order. + std::unique_ptr NewIterator() const; + + /// \brief Validate the row ranges. + /// \throws ParquetException if the row ranges are not in ascending order or + /// overlapped. + void Validate() const; + + /// \brief Get the total number of rows in the row ranges. + int64_t row_count() const; + + /// \brief Compute the intersection of two row ranges. + static RowRanges Intersect(const RowRanges& lhs, const RowRanges& rhs); + + /// \brief Compute the union of two row ranges. + static RowRanges Union(const RowRanges& lhs, const RowRanges& rhs); + + /// \brief Make a single row range of [0, row_count - 1]. + static RowRanges MakeSingle(int64_t row_count); + + /// \brief Make a single row range of [start, end]. + static RowRanges MakeSingle(int64_t start, int64_t end); + + /// \brief Make a row range from a list of intervals. + static RowRanges MakeIntervals(const std::vector& intervals); + + private: + friend class IteratorImpl; + std::vector> ranges_; +}; + +} // namespace parquet