Skip to content

Commit

Permalink
Add support for spilling distinct aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
aditi-pandit committed Jan 2, 2025
1 parent 98932e5 commit 66c7c1c
Show file tree
Hide file tree
Showing 8 changed files with 678 additions and 45 deletions.
7 changes: 0 additions & 7 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,6 @@ void addVectorSerdeKind(VectorSerde::Kind kind, std::stringstream& stream) {
} // namespace

bool AggregationNode::canSpill(const QueryConfig& queryConfig) const {
// TODO: Add spilling for aggregations over distinct inputs.
// https://github.com/facebookincubator/velox/issues/7454
for (const auto& aggregate : aggregates_) {
if (aggregate.distinct) {
return false;
}
}
// TODO: add spilling for pre-grouped aggregation later:
// https://github.com/facebookincubator/velox/issues/3264
return (isFinal() || isSingle()) && !groupingKeys().empty() &&
Expand Down
52 changes: 49 additions & 3 deletions velox/exec/DistinctAggregations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ class TypedDistinctAggregations : public DistinctAggregations {
sizeof(AccumulatorType),
false, // usesExternalMemory
1, // alignment
nullptr,
[](folly::Range<char**> /*groups*/, VectorPtr& /*result*/) {
VELOX_UNREACHABLE();
ARRAY(VARBINARY()),
[this](folly::Range<char**> groups, VectorPtr& result) {
extractForSpill(groups, result);
},
[this](folly::Range<char**> groups) {
for (auto* group : groups) {
Expand Down Expand Up @@ -89,6 +89,21 @@ class TypedDistinctAggregations : public DistinctAggregations {
inputForAccumulator_.reset();
}

void addSingleGroupSpillInput(
char* group,
const VectorPtr& input,
vector_size_t index) override {
auto* arrayVector = input->as<ArrayVector>();
auto* elementsVector = arrayVector->elements()->asFlatVector<StringView>();

const auto size = arrayVector->sizeAt(index);
const auto offset = arrayVector->offsetAt(index);

auto* accumulator = reinterpret_cast<AccumulatorType*>(group + offset_);
RowSizeTracker<char, uint32_t> tracker(group[rowSizeOffset_], *allocator_);
accumulator->deserialize(*elementsVector, offset, size, allocator_);
}

void extractValues(folly::Range<char**> groups, const RowVectorPtr& result)
override {
SelectivityVector rows;
Expand Down Expand Up @@ -149,6 +164,8 @@ class TypedDistinctAggregations : public DistinctAggregations {
}
}

void clear() override {}

private:
bool isSingleInputAggregate() const {
return aggregates_[0]->inputs.size() == 1;
Expand Down Expand Up @@ -196,6 +213,35 @@ class TypedDistinctAggregations : public DistinctAggregations {
return input->template asUnchecked<RowVector>()->children();
}

void extractForSpill(folly::Range<char**> groups, VectorPtr& result) const {
auto* arrayVector = result->as<ArrayVector>();
arrayVector->resize(groups.size());

auto* rawOffsets =
arrayVector->mutableOffsets(groups.size())->asMutable<vector_size_t>();
auto* rawSizes =
arrayVector->mutableSizes(groups.size())->asMutable<vector_size_t>();

vector_size_t offset = 0;
for (auto i = 0; i < groups.size(); ++i) {
auto* accumulator =
reinterpret_cast<AccumulatorType*>(groups[i] + offset_);
rawSizes[i] = accumulator->size() + 1;
rawOffsets[i] = offset;
offset += accumulator->size() + 1;
}

auto& elementsVector = arrayVector->elements();
elementsVector->resize(offset);
offset = 0;
for (auto i = 0; i < groups.size(); ++i) {
auto* accumulator =
reinterpret_cast<AccumulatorType*>(groups[i] + offset_);
accumulator->serialize(elementsVector, offset);
offset += accumulator->size() + 1;
}
}

memory::MemoryPool* const pool_;
const std::vector<AggregateInfo*> aggregates_;
const std::vector<column_index_t> inputs_;
Expand Down
7 changes: 7 additions & 0 deletions velox/exec/DistinctAggregations.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,18 @@ class DistinctAggregations {
const RowVectorPtr& input,
const SelectivityVector& rows) = 0;

virtual void addSingleGroupSpillInput(
char* group,
const VectorPtr& input,
vector_size_t index) = 0;

/// Computes aggregations and stores results in the specified 'result' vector.
virtual void extractValues(
folly::Range<char**> groups,
const RowVectorPtr& result) = 0;

virtual void clear() = 0;

protected:
// Initializes null flags and accumulators for newly encountered groups. This
// function should be called only once for each group.
Expand Down
38 changes: 36 additions & 2 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,13 @@ void GroupingSet::spill() {
if (sortedAggregations_) {
sortedAggregations_->clear();
}

for (const auto& agg : distinctAggregations_) {
if (agg != nullptr) {
agg->clear();
}
}

table_->clear(/*freeTable=*/true);
}

Expand All @@ -1070,6 +1077,17 @@ void GroupingSet::spill(const RowContainerIterator& rowIterator) {
// guarantee we don't accidentally enter an unsafe situation.
rows->stringAllocator().freezeAndExecute(
[&]() { outputSpiller_->spill(rowIterator); });

if (sortedAggregations_) {
sortedAggregations_->clear();
}

for (const auto& agg : distinctAggregations_) {
if (agg != nullptr) {
agg->clear();
}
}

table_->clear(/*freeTable=*/true);
}

Expand Down Expand Up @@ -1266,6 +1284,13 @@ void GroupingSet::initializeRow(SpillMergeStream& stream, char* row) {
sortedAggregations_->initializeNewGroups(
&row, folly::Range<const vector_size_t*>(&zero, 1));
}

for (const auto& agg : distinctAggregations_) {
if (agg != nullptr) {
agg->initializeNewGroups(
&row, folly::Range<const vector_size_t*>(&zero, 1));
}
}
}

void GroupingSet::extractSpillResult(const RowVectorPtr& result) {
Expand Down Expand Up @@ -1297,11 +1322,20 @@ void GroupingSet::updateRow(SpillMergeStream& input, char* row) {
}
mergeSelection_.setValid(input.currentIndex(), false);

int index = aggregates_.size() + keyChannels_.size();
if (sortedAggregations_ != nullptr) {
const auto& vector =
input.current().childAt(aggregates_.size() + keyChannels_.size());
const auto& vector = input.current().childAt(index);
sortedAggregations_->addSingleGroupSpillInput(
row, vector, input.currentIndex());
index++;
}

for (const auto& agg : distinctAggregations_) {
if (agg != nullptr) {
agg->addSingleGroupSpillInput(
row, input.current().childAt(index), input.currentIndex());
index++;
}
}
}

Expand Down
Loading

0 comments on commit 66c7c1c

Please sign in to comment.