Skip to content

Commit

Permalink
WIP: [C++][Parquet] Add RowRanges API
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac committed Jan 17, 2025
1 parent 0758845 commit ae15c3f
Show file tree
Hide file tree
Showing 3 changed files with 313 additions and 0 deletions.
1 change: 1 addition & 0 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ set(PARQUET_SRCS
platform.cc
printer.cc
properties.cc
row_range.cc
schema.cc
size_statistics.cc
statistics.cc
Expand Down
225 changes: 225 additions & 0 deletions cpp/src/parquet/row_range.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "parquet/row_range.h"

#include "arrow/util/bitmap_ops.h"
#include "arrow/util/unreachable.h"
#include "parquet/exception.h"

namespace parquet {

class IteratorImpl : public RowRanges::Iterator {
public:
explicit IteratorImpl(const RowRanges& ranges)
: iter_(ranges.ranges_.cbegin()), end_(ranges.ranges_.cend()) {}

~IteratorImpl() override = default;

std::variant<RowRanges::IntervalRange, RowRanges::BitmapRange, RowRanges::End>
NextRange() override {
if (iter_ == end_) {
return RowRanges::End();
}
if (std::holds_alternative<RowRanges::IntervalRange>(*iter_)) {
return std::get<RowRanges::IntervalRange>(*iter_);
}
if (std::holds_alternative<RowRanges::BitmapRange>(*iter_)) {
return std::get<RowRanges::BitmapRange>(*iter_);
}
arrow::Unreachable("Invalid row ranges type");
}

private:
decltype(RowRanges::ranges_.cbegin()) iter_;
decltype(RowRanges::ranges_.cend()) end_;
};

std::unique_ptr<RowRanges::Iterator> RowRanges::NewIterator() const {
return std::make_unique<IteratorImpl>(*this);
}

void RowRanges::Validate() const {
int64_t last_end = -1;
for (const auto& range : ranges_) {
if (std::holds_alternative<RowRanges::IntervalRange>(range)) {
const auto& interval = std::get<RowRanges::IntervalRange>(range);
if (interval.start <= last_end) {
throw ParquetException("Row ranges are not in ascending order");
}
if (interval.end < interval.start) {
throw ParquetException("Invalid interval range");
}
last_end = interval.end;
continue;
}
if (std::holds_alternative<RowRanges::BitmapRange>(range)) {
const auto& bitmap = std::get<RowRanges::BitmapRange>(range);
if (bitmap.offset <= last_end) {
throw ParquetException("Row ranges are not in ascending order");
}
last_end = bitmap.offset + sizeof(bitmap.bitmap) - 1;
continue;
}
arrow::Unreachable("Invalid row ranges type");
}
}

int64_t RowRanges::row_count() const {
int64_t count = 0;
for (const auto& range : ranges_) {
if (std::holds_alternative<RowRanges::IntervalRange>(range)) {
const auto& interval = std::get<RowRanges::IntervalRange>(range);
count += interval.end - interval.start + 1;
}
if (std::holds_alternative<RowRanges::BitmapRange>(range)) {
const auto& bitmap = std::get<RowRanges::BitmapRange>(range);
count += arrow::internal::CountSetBits(
reinterpret_cast<const uint8_t*>(&bitmap.bitmap), 0, sizeof(bitmap.bitmap));
}
arrow::Unreachable("Invalid row ranges type");
}
return count;
}

RowRanges RowRanges::Intersect(const RowRanges& lhs, const RowRanges& rhs) {
RowRanges result;
auto lhs_iter = lhs.NewIterator();
auto rhs_iter = rhs.NewIterator();
auto lhs_range = lhs_iter->NextRange();
auto rhs_range = rhs_iter->NextRange();

while (!std::holds_alternative<End>(lhs_range) &&
!std::holds_alternative<End>(rhs_range)) {
if (!std::holds_alternative<IntervalRange>(lhs_range) ||
!std::holds_alternative<IntervalRange>(rhs_range)) {
throw ParquetException("Bitmap range is not yet supported");
}

auto& left = std::get<IntervalRange>(lhs_range);
auto& right = std::get<IntervalRange>(rhs_range);

// Find overlapping region
int64_t start = std::max(left.start, right.start);
int64_t end = std::min(left.end, right.end);

// If there is an overlap, add it to results
if (start <= end) {
result.ranges_.push_back(IntervalRange{start, end});
}

// Advance the iterator with smaller end
if (left.end < right.end) {
lhs_range = lhs_iter->NextRange();
} else {
rhs_range = rhs_iter->NextRange();
}
}

return result;
}

RowRanges RowRanges::Union(const RowRanges& lhs, const RowRanges& rhs) {
RowRanges result;
auto lhs_iter = lhs.NewIterator();
auto rhs_iter = rhs.NewIterator();
auto lhs_range = lhs_iter->NextRange();
auto rhs_range = rhs_iter->NextRange();

if (std::holds_alternative<End>(lhs_range)) {
return rhs;
}
if (std::holds_alternative<End>(rhs_range)) {
return lhs;
}

if (std::holds_alternative<BitmapRange>(lhs_range)) {
throw ParquetException("Bitmap range is not yet supported");
}
IntervalRange current = std::get<IntervalRange>(lhs_range);
lhs_range = lhs_iter->NextRange();

while (!std::holds_alternative<End>(lhs_range) ||
!std::holds_alternative<End>(rhs_range)) {
IntervalRange next;

if (std::holds_alternative<End>(rhs_range)) {
// Only lhs ranges remain
if (std::holds_alternative<BitmapRange>(lhs_range)) {
throw ParquetException("Bitmap range is not yet supported");
}
next = std::get<IntervalRange>(lhs_range);
lhs_range = lhs_iter->NextRange();
} else if (std::holds_alternative<End>(lhs_range)) {
// Only rhs ranges remain
if (std::holds_alternative<BitmapRange>(rhs_range)) {
throw ParquetException("Bitmap range is not yet supported");
}
next = std::get<IntervalRange>(rhs_range);
rhs_range = rhs_iter->NextRange();
} else {
// Both iterators have ranges - pick the one with smaller start
if (std::holds_alternative<BitmapRange>(lhs_range) ||
std::holds_alternative<BitmapRange>(rhs_range)) {
throw ParquetException("Bitmap range is not yet supported");
}
const auto& left = std::get<IntervalRange>(lhs_range);
const auto& right = std::get<IntervalRange>(rhs_range);

if (left.start <= right.start) {
next = left;
lhs_range = lhs_iter->NextRange();
} else {
next = right;
rhs_range = rhs_iter->NextRange();
}
}

if (current.end + 1 >= next.start) {
// Concatenate overlapping or adjacent ranges
current.end = std::max(current.end, next.end);
} else {
// Gap between current and next range
result.ranges_.push_back(current);
current = next;
}
}

result.ranges_.push_back(current);
return result;
}

RowRanges RowRanges::MakeSingle(int64_t row_count) {
RowRanges rowRanges;
rowRanges.ranges_.push_back(IntervalRange{0, row_count - 1});
return rowRanges;
}

RowRanges RowRanges::MakeSingle(int64_t start, int64_t end) {
RowRanges rowRanges;
rowRanges.ranges_.push_back(IntervalRange{start, end});
return rowRanges;
}

RowRanges RowRanges::MakeIntervals(const std::vector<IntervalRange>& intervals) {
RowRanges rowRanges;
rowRanges.ranges_.reserve(intervals.size());
rowRanges.ranges_.insert(rowRanges.ranges_.end(), intervals.cbegin(), intervals.cend());
return rowRanges;
}

} // namespace parquet
87 changes: 87 additions & 0 deletions cpp/src/parquet/row_range.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <variant>
#include <vector>

#include "parquet/platform.h"

namespace parquet {

/// RowRanges is a collection of non-overlapping and ascendingly ordered row ranges.
class PARQUET_EXPORT RowRanges {
public:
/// \brief A range of contiguous rows represented by an interval.
struct IntervalRange {
/// Start row of the range (inclusive).
int64_t start;
/// End row of the range (inclusive).
int64_t end;
};

/// \brief A range of contiguous rows represented by a bitmap.
struct BitmapRange {
/// Start row of the range (inclusive).
int64_t offset;
/// Zero appended if there are less than 64 elements.
uint64_t bitmap;
};

/// \brief An end marker for the row range iterator.
struct End {};

/// \brief An iterator for accessing row ranges in order.
class Iterator {
public:
virtual ~Iterator() = default;
virtual std::variant<IntervalRange, BitmapRange, End> NextRange() = 0;
};

/// \brief Create a new iterator for accessing row ranges in order.
std::unique_ptr<Iterator> NewIterator() const;

/// \brief Validate the row ranges.
/// \throws ParquetException if the row ranges are not in ascending order or
/// overlapped.
void Validate() const;

/// \brief Get the total number of rows in the row ranges.
int64_t row_count() const;

/// \brief Compute the intersection of two row ranges.
static RowRanges Intersect(const RowRanges& lhs, const RowRanges& rhs);

/// \brief Compute the union of two row ranges.
static RowRanges Union(const RowRanges& lhs, const RowRanges& rhs);

/// \brief Make a single row range of [0, row_count - 1].
static RowRanges MakeSingle(int64_t row_count);

/// \brief Make a single row range of [start, end].
static RowRanges MakeSingle(int64_t start, int64_t end);

/// \brief Make a row range from a list of intervals.
static RowRanges MakeIntervals(const std::vector<IntervalRange>& intervals);

private:
friend class IteratorImpl;
std::vector<std::variant<IntervalRange, BitmapRange>> ranges_;
};

} // namespace parquet

0 comments on commit ae15c3f

Please sign in to comment.