From f973c65e72255538d1cbc0f07774752627ffe5a8 Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Sat, 28 Dec 2024 01:21:28 -0800 Subject: [PATCH] refactor: Refactor spiller to have better abstraction (#11656) Summary: Spiller is a single class dealing with multiple different spilling scenarios. We want to abstract out Spiller and make its implementations closer to its use sites and expose different use case APIs. Pull Request resolved: https://github.com/facebookincubator/velox/pull/11656 Reviewed By: xiaoxmeng Differential Revision: D67666563 Pulled By: tanjialiang fbshipit-source-id: 66c5cfc307e248f00c1ad871409023f05d0da4d0 --- velox/exec/GroupingSet.cpp | 112 ++- velox/exec/GroupingSet.h | 70 +- velox/exec/HashBuild.cpp | 95 +- velox/exec/HashBuild.h | 37 +- velox/exec/HashJoinBridge.cpp | 12 +- velox/exec/HashJoinBridge.h | 8 +- velox/exec/HashProbe.cpp | 19 +- velox/exec/HashProbe.h | 2 +- velox/exec/OrderBy.h | 1 - velox/exec/RowNumber.cpp | 33 +- velox/exec/RowNumber.h | 27 +- velox/exec/SortBuffer.cpp | 59 +- velox/exec/SortBuffer.h | 43 +- velox/exec/SortWindowBuild.cpp | 12 +- velox/exec/SortWindowBuild.h | 11 +- velox/exec/Spill.h | 6 +- velox/exec/Spiller.cpp | 843 ++++++------------ velox/exec/Spiller.h | 364 +++----- velox/exec/TopNRowNumber.cpp | 4 +- velox/exec/TopNRowNumber.h | 14 +- .../tests/AggregateSpillBenchmarkBase.cpp | 32 +- .../exec/tests/AggregateSpillBenchmarkBase.h | 6 +- .../tests/JoinSpillInputBenchmarkBase.cpp | 16 +- .../exec/tests/JoinSpillInputBenchmarkBase.h | 2 +- .../tests/SpillerAggregateBenchmarkTest.cpp | 22 +- velox/exec/tests/SpillerBenchmarkBase.cpp | 2 +- velox/exec/tests/SpillerBenchmarkBase.h | 4 +- velox/exec/tests/SpillerTest.cpp | 590 ++++++------ velox/exec/tests/TaskTest.cpp | 6 +- .../tests/utils/AggregationTestBase.cpp | 1 - 30 files changed, 1226 insertions(+), 1227 deletions(-) diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index 9f15ea440c91..eea5c48df7d0 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -209,10 +209,11 @@ void GroupingSet::noMoreInput() { addRemainingInput(); } + VELOX_CHECK_NULL(outputSpiller_); // Spill the remaining in-memory state to disk if spilling has been triggered // on this grouping set. This is to simplify query OOM prevention when // producing output as we don't support to spill during that stage as for now. - if (hasSpilled()) { + if (inputSpiller_ != nullptr) { spill(); } @@ -220,7 +221,11 @@ void GroupingSet::noMoreInput() { } bool GroupingSet::hasSpilled() const { - return spiller_ != nullptr; + if (inputSpiller_ != nullptr) { + VELOX_CHECK_NULL(outputSpiller_); + return true; + } + return outputSpiller_ != nullptr; } bool GroupingSet::hasOutput() { @@ -980,6 +985,18 @@ RowTypePtr GroupingSet::makeSpillType() const { return ROW(std::move(names), std::move(types)); } +std::optional GroupingSet::spilledStats() const { + if (!hasSpilled()) { + return std::nullopt; + } + if (inputSpiller_ != nullptr) { + VELOX_CHECK_NULL(outputSpiller_); + return inputSpiller_->stats(); + } + VELOX_CHECK_NOT_NULL(outputSpiller_); + return outputSpiller_->stats(); +} + void GroupingSet::spill() { // NOTE: if the disk spilling is triggered by the memory arbitrator, then it // is possible that the grouping set hasn't processed any input data yet. @@ -989,11 +1006,11 @@ void GroupingSet::spill() { } auto* rows = table_->rows(); - if (!hasSpilled()) { + VELOX_CHECK_NULL(outputSpiller_); + if (inputSpiller_ == nullptr) { VELOX_DCHECK(pool_.trackUsage()); VELOX_CHECK(numDistinctSpillFilesPerPartition_.empty()); - spiller_ = std::make_unique( - Spiller::Type::kAggregateInput, + inputSpiller_ = std::make_unique( rows, makeSpillType(), HashBitRange( @@ -1006,22 +1023,23 @@ void GroupingSet::spill() { spillConfig_, spillStats_); VELOX_CHECK_EQ( - spiller_->state().maxPartitions(), 1 << spillConfig_->numPartitionBits); + inputSpiller_->state().maxPartitions(), + 1 << spillConfig_->numPartitionBits); } // Spilling may execute on multiple partitions in parallel, and // HashStringAllocator is not thread safe. If any aggregations // allocate/deallocate memory during spilling it can lead to concurrency bugs. // Freeze the HashStringAllocator to make it effectively immutable and // guarantee we don't accidentally enter an unsafe situation. - rows->stringAllocator().freezeAndExecute([&]() { spiller_->spill(); }); + rows->stringAllocator().freezeAndExecute([&]() { inputSpiller_->spill(); }); if (isDistinct() && numDistinctSpillFilesPerPartition_.empty()) { size_t totalNumDistinctSpilledFiles{0}; numDistinctSpillFilesPerPartition_.resize( - spiller_->state().maxPartitions(), 0); - for (int partition = 0; partition < spiller_->state().maxPartitions(); + inputSpiller_->state().maxPartitions(), 0); + for (int partition = 0; partition < inputSpiller_->state().maxPartitions(); ++partition) { numDistinctSpillFilesPerPartition_[partition] = - spiller_->state().numFinishedFiles(partition); + inputSpiller_->state().numFinishedFiles(partition); totalNumDistinctSpilledFiles += numDistinctSpillFilesPerPartition_[partition]; } @@ -1042,12 +1060,8 @@ void GroupingSet::spill(const RowContainerIterator& rowIterator) { auto* rows = table_->rows(); VELOX_CHECK(pool_.trackUsage()); - spiller_ = std::make_unique( - Spiller::Type::kAggregateOutput, - rows, - makeSpillType(), - spillConfig_, - spillStats_); + outputSpiller_ = std::make_unique( + rows, makeSpillType(), spillConfig_, spillStats_); // Spilling may execute on multiple partitions in parallel, and // HashStringAllocator is not thread safe. If any aggregations @@ -1055,7 +1069,7 @@ void GroupingSet::spill(const RowContainerIterator& rowIterator) { // Freeze the HashStringAllocator to make it effectively immutable and // guarantee we don't accidentally enter an unsafe situation. rows->stringAllocator().freezeAndExecute( - [&]() { spiller_->spill(rowIterator); }); + [&]() { outputSpiller_->spill(rowIterator); }); table_->clear(/*freeTable=*/true); } @@ -1091,7 +1105,13 @@ bool GroupingSet::getOutputWithSpill( table_->clear(/*freeTable=*/true); VELOX_CHECK_NULL(merge_); - spiller_->finishSpill(spillPartitionSet_); + if (inputSpiller_ != nullptr) { + VELOX_CHECK_NULL(outputSpiller_); + inputSpiller_->finishSpill(spillPartitionSet_); + } else { + VELOX_CHECK_NOT_NULL(outputSpiller_); + outputSpiller_->finishSpill(spillPartitionSet_); + } removeEmptyPartitions(spillPartitionSet_); if (!prepareNextSpillPartitionOutput()) { @@ -1176,9 +1196,11 @@ bool GroupingSet::mergeNextWithoutAggregates( const RowVectorPtr& result) { VELOX_CHECK_NOT_NULL(merge_); VELOX_CHECK(isDistinct()); + VELOX_CHECK_NULL(outputSpiller_); + VELOX_CHECK_NOT_NULL(inputSpiller_); VELOX_CHECK_EQ( numDistinctSpillFilesPerPartition_.size(), - spiller_->state().maxPartitions()); + inputSpiller_->state().maxPartitions()); // We are looping over sorted rows produced by tree-of-losers. We logically // split the stream into runs of duplicate rows. As we process each run we @@ -1414,4 +1436,56 @@ std::optional GroupingSet::estimateOutputRowSize() const { } return table_->rows()->estimateRowSize(); } + +AggregationInputSpiller::AggregationInputSpiller( + RowContainer* container, + RowTypePtr rowType, + const HashBitRange& hashBitRange, + int32_t numSortingKeys, + const std::vector& sortCompareFlags, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) + : SpillerBase( + container, + std::move(rowType), + hashBitRange, + numSortingKeys, + sortCompareFlags, + std::numeric_limits::max(), + spillConfig->maxSpillRunRows, + spillConfig, + spillStats) {} + +AggregationOutputSpiller::AggregationOutputSpiller( + RowContainer* container, + RowTypePtr rowType, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) + : SpillerBase( + container, + std::move(rowType), + HashBitRange{}, + 0, + {}, + std::numeric_limits::max(), + spillConfig->maxSpillRunRows, + spillConfig, + spillStats) {} + +void AggregationInputSpiller::spill() { + SpillerBase::spill(nullptr); +} + +void AggregationOutputSpiller::spill(const RowContainerIterator& startRowIter) { + SpillerBase::spill(&startRowIter); +} + +void AggregationOutputSpiller::runSpill(bool lastRun) { + SpillerBase::runSpill(lastRun); + if (lastRun) { + for (auto partition = 0; partition < spillRuns_.size(); ++partition) { + state_.finishFile(partition); + } + } +} } // namespace facebook::velox::exec diff --git a/velox/exec/GroupingSet.h b/velox/exec/GroupingSet.h index 9edb3cfac34d..f91543ce96da 100644 --- a/velox/exec/GroupingSet.h +++ b/velox/exec/GroupingSet.h @@ -25,6 +25,8 @@ #include "velox/exec/VectorHasher.h" namespace facebook::velox::exec { +class AggregationInputSpiller; +class AggregationOutputSpiller; class GroupingSet { public: @@ -46,7 +48,7 @@ class GroupingSet { ~GroupingSet(); - // Used by MarkDistinct operator to identify rows with unique values. + /// Used by MarkDistinct operator to identify rows with unique values. static std::unique_ptr createForMarkDistinct( const RowTypePtr& inputType, std::vector>&& hashers, @@ -110,16 +112,12 @@ class GroupingSet { void spill(); /// Spills all the rows in container starting from the offset specified by - /// 'rowIterator'. + /// 'rowIterator'. This should be only called during output processing and + /// when no spill has occurred previously. void spill(const RowContainerIterator& rowIterator); /// Returns the spiller stats including total bytes and rows spilled so far. - std::optional spilledStats() const { - if (spiller_ == nullptr) { - return std::nullopt; - } - return spiller_->stats(); - } + std::optional spilledStats() const; /// Returns true if spilling has triggered on this grouping set. bool hasSpilled() const; @@ -134,8 +132,8 @@ class GroupingSet { return table_ ? table_->rows()->numRows() : 0; } - // Frees hash tables and other state when giving up partial aggregation as - // non-productive. Must be called before toIntermediate() is used. + /// Frees hash tables and other state when giving up partial aggregation as + /// non-productive. Must be called before toIntermediate() is used. void abandonPartialAggregation(); /// Translates the raw input in input to accumulators initialized from a @@ -342,7 +340,9 @@ class GroupingSet { // 'remainingInput_'. bool remainingMayPushdown_; - std::unique_ptr spiller_; + std::unique_ptr inputSpiller_; + + std::unique_ptr outputSpiller_; // The current spill partition in producing spill output. If it is -1, then we // haven't started yet. @@ -391,4 +391,52 @@ class GroupingSet { folly::Synchronized* const spillStats_; }; +class AggregationInputSpiller : public SpillerBase { + public: + static constexpr std::string_view kType = "AggregationInputSpiller"; + + AggregationInputSpiller( + RowContainer* container, + RowTypePtr rowType, + const HashBitRange& hashBitRange, + int32_t numSortingKeys, + const std::vector& sortCompareFlags, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); + + void spill(); + + private: + std::string type() const override { + return std::string(kType); + } + + bool needSort() const override { + return true; + } +}; + +class AggregationOutputSpiller : public SpillerBase { + public: + static constexpr std::string_view kType = "AggregationOutputSpiller"; + + AggregationOutputSpiller( + RowContainer* container, + RowTypePtr rowType, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); + + void spill(const RowContainerIterator& startRowIter); + + private: + std::string type() const override { + return std::string(kType); + } + + void runSpill(bool lastRun) override; + + bool needSort() const override { + return false; + } +}; } // namespace facebook::velox::exec diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index f6220a5acf99..63eac56225f3 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -220,8 +220,7 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) { exceededMaxSpillLevelLimit_ = false; } - spiller_ = std::make_unique( - Spiller::Type::kHashJoinBuild, + spiller_ = std::make_unique( joinType_, table_->rows(), spillType_, @@ -418,7 +417,8 @@ void HashBuild::addInput(RowVectorPtr input) { void HashBuild::ensureInputFits(RowVectorPtr& input) { // NOTE: we don't need memory reservation if all the partitions are spilling // as we spill all the input rows to disk directly. - if (!canSpill() || spiller_ == nullptr || spiller_->isAllSpilled()) { + if (!canSpill() || spiller_ == nullptr || + spiller_->state().isAllPartitionSpilled()) { return; } @@ -488,7 +488,7 @@ void HashBuild::ensureInputFits(RowVectorPtr& input) { // itself, we will no longer need the reserved memory for building hash // table as the table is spilled, and the input will be directly spilled, // too. - if (spiller_->isAllSpilled()) { + if (spiller_->state().isAllPartitionSpilled()) { pool()->release(); } return; @@ -503,13 +503,14 @@ void HashBuild::ensureInputFits(RowVectorPtr& input) { void HashBuild::spillInput(const RowVectorPtr& input) { VELOX_CHECK_EQ(input->size(), activeRows_.size()); - if (!canSpill() || spiller_ == nullptr || !spiller_->isAnySpilled() || + if (!canSpill() || spiller_ == nullptr || + !spiller_->state().isAnyPartitionSpilled() || !activeRows_.hasSelections()) { return; } const auto numInput = input->size(); - prepareInputIndicesBuffers(numInput, spiller_->spilledPartitionSet()); + prepareInputIndicesBuffers(numInput, spiller_->state().spilledPartitionSet()); computeSpillPartitions(input); vector_size_t numSpillInputs = 0; @@ -518,7 +519,7 @@ void HashBuild::spillInput(const RowVectorPtr& input) { if (FOLLY_UNLIKELY(!activeRows_.isValid(row))) { continue; } - if (!spiller_->isSpilled(partition)) { + if (!spiller_->state().isPartitionSpilled(partition)) { continue; } activeRows_.setValid(row, false); @@ -537,7 +538,7 @@ void HashBuild::spillInput(const RowVectorPtr& input) { if (numInputs == 0) { continue; } - VELOX_CHECK(spiller_->isSpilled(partition)); + VELOX_CHECK(spiller_->state().isPartitionSpilled(partition)); spillPartition( partition, numInputs, spillInputIndicesBuffers_[partition], input); } @@ -705,7 +706,7 @@ bool HashBuild::finishHashBuild() { otherTables.reserve(peers.size()); SpillPartitionSet spillPartitions; for (auto* build : otherBuilds) { - std::unique_ptr spiller; + std::unique_ptr spiller; { std::lock_guard l(build->mutex_); VELOX_CHECK( @@ -785,8 +786,8 @@ bool HashBuild::finishHashBuild() { void HashBuild::ensureTableFits(uint64_t numRows) { // NOTE: we don't need memory reservation if all the partitions have been // spilled as nothing need to be built. - if (!canSpill() || spiller_ == nullptr || spiller_->isAllSpilled() || - numRows == 0) { + if (!canSpill() || spiller_ == nullptr || + spiller_->state().isAllPartitionSpilled() || numRows == 0) { return; } @@ -811,7 +812,7 @@ void HashBuild::ensureTableFits(uint64_t numRows) { // If reservation triggers the spilling of 'HashBuild' operator itself, we // will no longer need the reserved memory for building hash table as the // table is spilled. - if (spiller_->isAllSpilled()) { + if (spiller_->state().isAllPartitionSpilled()) { pool()->release(); } return; @@ -920,7 +921,7 @@ void HashBuild::addRuntimeStats() { } // Add max spilling level stats if spilling has been triggered. - if (spiller_ != nullptr && spiller_->isAnySpilled()) { + if (spiller_ != nullptr && spiller_->state().isAnyPartitionSpilled()) { lockedStats->addRuntimeStat( "maxSpillLevel", RuntimeCounter( @@ -1094,7 +1095,7 @@ void HashBuild::reclaim( } } - std::vector spillers; + std::vector spillers; for (auto* op : operators) { HashBuild* buildOp = static_cast(op); spillers.push_back(buildOp->spiller_.get()); @@ -1136,4 +1137,70 @@ void HashBuild::close() { table_.reset(); } } + +HashBuildSpiller::HashBuildSpiller( + core::JoinType joinType, + RowContainer* container, + RowTypePtr rowType, + HashBitRange bits, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) + : SpillerBase( + container, + std::move(rowType), + bits, + 0, + {}, + spillConfig->maxFileSize, + spillConfig->maxSpillRunRows, + spillConfig, + spillStats), + spillProbeFlag_(needRightSideJoin(joinType)) { + VELOX_CHECK(container_->accumulators().empty()); +} + +void HashBuildSpiller::spill() { + SpillerBase::spill(nullptr); +} + +void HashBuildSpiller::spill( + uint32_t partition, + const RowVectorPtr& spillVector) { + VELOX_CHECK(!finalized_); + if (FOLLY_UNLIKELY(!state_.isPartitionSpilled(partition))) { + VELOX_FAIL( + "Can't spill vector to a non-spilling partition: {}, {}", + partition, + toString()); + } + VELOX_DCHECK(spillRuns_[partition].rows.empty()); + + if (FOLLY_UNLIKELY(spillVector == nullptr)) { + return; + } + + state_.appendToPartition(partition, spillVector); +} + +void HashBuildSpiller::extractSpill( + folly::Range rows, + facebook::velox::RowVectorPtr& resultPtr) { + if (resultPtr == nullptr) { + resultPtr = BaseVector::create( + rowType_, rows.size(), memory::spillMemoryPool()); + } else { + resultPtr->prepareForReuse(); + resultPtr->resize(rows.size()); + } + + auto* result = resultPtr.get(); + const auto& types = container_->columnTypes(); + for (auto i = 0; i < types.size(); ++i) { + container_->extractColumn(rows.data(), rows.size(), i, result->childAt(i)); + } + if (spillProbeFlag_) { + container_->extractProbedFlags( + rows.data(), rows.size(), false, false, result->childAt(types.size())); + } +} } // namespace facebook::velox::exec diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index 0b12554afc8d..5055186ca375 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -25,6 +25,7 @@ #include "velox/expression/Expr.h" namespace facebook::velox::exec { +class HashBuildSpiller; /// Builds a hash table for use in HashProbe. This is the final /// Operator in a build side Driver. The build side pipeline has @@ -276,7 +277,7 @@ class HashBuild final : public Operator { // This can be nullptr if either spilling is not allowed or it has been // transferred to the last hash build operator while in kWaitForBuild state or // it has been cleared to set up a new one for recursive spilling. - std::unique_ptr spiller_; + std::unique_ptr spiller_; // Used to read input from previously spilled data for restoring. std::unique_ptr> spillInputReader_; @@ -308,6 +309,40 @@ inline std::ostream& operator<<(std::ostream& os, HashBuild::State state) { os << HashBuild::stateName(state); return os; } + +class HashBuildSpiller : public SpillerBase { + public: + static constexpr std::string_view kType = "HashBuildSpiller"; + + HashBuildSpiller( + core::JoinType joinType, + RowContainer* container, + RowTypePtr rowType, + HashBitRange bits, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); + + /// Invoked to spill all the rows stored in the row container of the hash + /// build. + void spill(); + + /// Invoked to spill a given partition from the input vector 'spillVector'. + void spill(uint32_t partition, const RowVectorPtr& spillVector); + + private: + void extractSpill(folly::Range rows, RowVectorPtr& resultPtr) + override; + + bool needSort() const override { + return false; + } + + std::string type() const override { + return std::string(kType); + } + + const bool spillProbeFlag_; +}; } // namespace facebook::velox::exec template <> diff --git a/velox/exec/HashJoinBridge.cpp b/velox/exec/HashJoinBridge.cpp index 4c8b4e54c5d6..0171d251a74c 100644 --- a/velox/exec/HashJoinBridge.cpp +++ b/velox/exec/HashJoinBridge.cpp @@ -16,6 +16,7 @@ #include "velox/exec/HashJoinBridge.h" #include "velox/common/memory/MemoryArbitrator.h" +#include "velox/exec/HashBuild.h" namespace facebook::velox::exec { namespace { @@ -91,15 +92,14 @@ namespace { // 'table' to parallelize the table spilling. The function spills all the rows // from the row container and returns the spiller for the caller to collect the // spilled partitions and stats. -std::unique_ptr createSpiller( +std::unique_ptr createSpiller( RowContainer* subTableRows, core::JoinType joinType, const RowTypePtr& tableType, const HashBitRange& hashBitRange, const common::SpillConfig* spillConfig, folly::Synchronized* stats) { - return std::make_unique( - Spiller::Type::kHashJoinBuild, + return std::make_unique( joinType, subTableRows, hashJoinTableSpillType(tableType, joinType), @@ -110,7 +110,7 @@ std::unique_ptr createSpiller( } // namespace std::vector> spillHashJoinTable( - const std::vector& spillers, + const std::vector& spillers, const common::SpillConfig* spillConfig) { VELOX_CHECK_NOT_NULL(spillConfig); auto spillExecutor = spillConfig->executor; @@ -172,8 +172,8 @@ SpillPartitionSet spillHashJoinTable( return {}; } - std::vector> spillersHolder; - std::vector spillers; + std::vector> spillersHolder; + std::vector spillers; const auto rowContainers = table->allRows(); const auto tableType = hashJoinTableType(joinNode); for (auto* rowContainer : rowContainers) { diff --git a/velox/exec/HashJoinBridge.h b/velox/exec/HashJoinBridge.h index d59c564cc7af..ab2899c0caaf 100644 --- a/velox/exec/HashJoinBridge.h +++ b/velox/exec/HashJoinBridge.h @@ -21,6 +21,7 @@ #include "velox/exec/Spill.h" namespace facebook::velox::exec { +class HashBuildSpiller; namespace test { class HashJoinBridgeTestHelper; @@ -209,19 +210,20 @@ RowTypePtr hashJoinTableType( const std::shared_ptr& joinNode); struct HashJoinTableSpillResult { - Spiller* spiller{nullptr}; + HashBuildSpiller* spiller{nullptr}; const std::exception_ptr error{nullptr}; explicit HashJoinTableSpillResult(std::exception_ptr _error) : error(_error) {} - explicit HashJoinTableSpillResult(Spiller* _spiller) : spiller(_spiller) {} + explicit HashJoinTableSpillResult(HashBuildSpiller* _spiller) + : spiller(_spiller) {} }; /// Invoked to spill the hash table from a set of spillers. If 'spillExecutor' /// is provided, then we do parallel spill. This is used by hash build to spill /// a partially built hash join table. std::vector> spillHashJoinTable( - const std::vector& spillers, + const std::vector& spillers, const common::SpillConfig* spillConfig); /// Invoked to spill 'table' and returns spilled partitions. This is used by diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index 8fe132dc0dbb..3a3fd85270d3 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -250,8 +250,7 @@ void HashProbe::maybeSetupInputSpiller( // If 'spillInputPartitionIds_' is not empty, then we set up a spiller to // spill the incoming probe inputs. - inputSpiller_ = std::make_unique( - Spiller::Type::kHashJoinProbe, + inputSpiller_ = std::make_unique( probeType_, HashBitRange( spillInputPartitionIds_.begin()->partitionBitOffset(), @@ -536,7 +535,7 @@ void HashProbe::spillInput(RowVectorPtr& input) { for (auto row = 0; row < numInput; ++row) { const auto partition = singlePartition.has_value() ? singlePartition.value() : spillPartitions_[row]; - if (!inputSpiller_->isSpilled(partition)) { + if (!inputSpiller_->state().isPartitionSpilled(partition)) { rawNonSpillInputIndicesBuffer_[numNonSpillingInput++] = row; continue; } @@ -556,7 +555,7 @@ void HashProbe::spillInput(RowVectorPtr& input) { if (numSpillInputs == 0) { continue; } - VELOX_CHECK(inputSpiller_->isSpilled(partition)); + VELOX_CHECK(inputSpiller_->state().isPartitionSpilled(partition)); inputSpiller_->spill( partition, wrap(numSpillInputs, spillInputIndicesBuffers_[partition], input)); @@ -1630,7 +1629,7 @@ void HashProbe::noMoreInputInternal() { VELOX_CHECK_NOT_NULL(inputSpiller_); VELOX_CHECK_EQ( spillInputPartitionIds_.size(), - inputSpiller_->spilledPartitionSet().size()); + inputSpiller_->state().spilledPartitionSet().size()); inputSpiller_->finishSpill(inputSpillPartitionSet_); VELOX_CHECK_EQ(spillStats_.rlock()->spillSortTimeNanos, 0); } @@ -1889,12 +1888,8 @@ void HashProbe::spillOutput() { return; } // We spill all the outputs produced from 'input_' into a single partition. - auto outputSpiller = std::make_unique( - Spiller::Type::kHashJoinProbe, - outputType_, - HashBitRange{}, - spillConfig(), - &spillStats_); + auto outputSpiller = std::make_unique( + outputType_, HashBitRange{}, spillConfig(), &spillStats_); outputSpiller->setPartitionsSpilled({0}); RowVectorPtr output{nullptr}; @@ -1918,7 +1913,7 @@ void HashProbe::spillOutput() { isRightSemiFilterJoin(joinType_) || isRightSemiProjectJoin(joinType_)); VELOX_CHECK((output == nullptr) && (input_ != nullptr)); } - VELOX_CHECK_LE(outputSpiller->spilledPartitionSet().size(), 1); + VELOX_CHECK_LE(outputSpiller->state().spilledPartitionSet().size(), 1); VELOX_CHECK(spillOutputPartitionSet_.empty()); outputSpiller->finishSpill(spillOutputPartitionSet_); diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index 81548b8f80cb..eef2b802a49c 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -649,7 +649,7 @@ class HashProbe : public Operator { // 'inputSpiller_' is created if some part of build-side rows have been // spilled. It is used to spill probe-side rows if the corresponding // build-side rows have been spilled. - std::unique_ptr inputSpiller_; + std::unique_ptr inputSpiller_; // If not empty, the probe inputs with partition id set in // 'spillInputPartitionIds_' needs to spill. It is set along with 'spiller_' diff --git a/velox/exec/OrderBy.h b/velox/exec/OrderBy.h index fe8e2e61e9b2..1765b553772d 100644 --- a/velox/exec/OrderBy.h +++ b/velox/exec/OrderBy.h @@ -22,7 +22,6 @@ #include "velox/exec/Spiller.h" namespace facebook::velox::exec { - /// OrderBy operator implementation: OrderBy stores all its inputs in a /// RowContainer as the inputs are added. Until all inputs are available, /// it blocks the pipeline. Once all inputs are available, it sorts pointers diff --git a/velox/exec/RowNumber.cpp b/velox/exec/RowNumber.cpp index 176d518b8ce1..ff0f9ab3a9a7 100644 --- a/velox/exec/RowNumber.cpp +++ b/velox/exec/RowNumber.cpp @@ -394,15 +394,13 @@ void RowNumber::reclaim( } SpillPartitionNumSet RowNumber::spillHashTable() { - // TODO Replace joinPartitionBits and Spiller::Type::kHashJoinBuild. VELOX_CHECK_NOT_NULL(table_); auto columnTypes = table_->rows()->columnTypes(); auto tableType = ROW(std::move(columnTypes)); const auto& spillConfig = spillConfig_.value(); - auto hashTableSpiller = std::make_unique( - Spiller::Type::kRowNumber, + auto hashTableSpiller = std::make_unique( table_->rows(), tableType, spillPartitionBits_, @@ -423,13 +421,8 @@ void RowNumber::setupInputSpiller( const auto& spillConfig = spillConfig_.value(); - // TODO Replace Spiller::Type::kHashJoinProbe. - inputSpiller_ = std::make_unique( - Spiller::Type::kHashJoinProbe, - inputType_, - spillPartitionBits_, - &spillConfig, - &spillStats_); + inputSpiller_ = std::make_unique( + inputType_, spillPartitionBits_, &spillConfig, &spillStats_); inputSpiller_->setPartitionsSpilled(spillPartitionSet); const auto& hashers = table_->hashers(); @@ -535,4 +528,24 @@ void RowNumber::setSpillPartitionBits( startPartitionBitOffset + spillConfig_->numPartitionBits); } +RowNumberHashTableSpiller::RowNumberHashTableSpiller( + RowContainer* container, + RowTypePtr rowType, + HashBitRange bits, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) + : SpillerBase( + container, + std::move(rowType), + bits, + 0, + {}, + spillConfig->maxFileSize, + spillConfig->maxSpillRunRows, + spillConfig, + spillStats) {} + +void RowNumberHashTableSpiller::spill() { + SpillerBase::spill(nullptr); +} } // namespace facebook::velox::exec diff --git a/velox/exec/RowNumber.h b/velox/exec/RowNumber.h index fa87d5391e8e..ee27d3d3cfbc 100644 --- a/velox/exec/RowNumber.h +++ b/velox/exec/RowNumber.h @@ -20,6 +20,8 @@ #include "velox/exec/Operator.h" namespace facebook::velox::exec { +class RowNumberHashTableSpiller; +class RowNumberInputSpiller; class RowNumber : public Operator { public: @@ -127,7 +129,7 @@ class RowNumber : public Operator { SpillPartitionSet spillHashTablePartitionSet_; // Spiller for input received after spilling has been triggered. - std::unique_ptr inputSpiller_; + std::unique_ptr inputSpiller_; // Used to restore previously spilled input. std::unique_ptr> spillInputReader_; @@ -143,4 +145,27 @@ class RowNumber : public Operator { bool exceededMaxSpillLevelLimit_{false}; }; + +class RowNumberHashTableSpiller : public SpillerBase { + public: + static constexpr std::string_view kType = "RowNumberHashTableSpiller"; + + RowNumberHashTableSpiller( + RowContainer* container, + RowTypePtr rowType, + HashBitRange bits, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); + + void spill(); + + private: + bool needSort() const override { + return false; + } + + std::string type() const override { + return std::string(kType); + } +}; } // namespace facebook::velox::exec diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index fe12fbecf1c1..b85b536624ee 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -16,6 +16,7 @@ #include "SortBuffer.h" #include "velox/exec/MemoryReclaimer.h" +#include "velox/exec/Spiller.h" namespace facebook::velox::exec { @@ -110,6 +111,8 @@ void SortBuffer::noMoreInput() { velox::common::testutil::TestValue::adjust( "facebook::velox::exec::SortBuffer::noMoreInput", this); VELOX_CHECK(!noMoreInput_); + VELOX_CHECK_NULL(outputSpiller_); + // It may trigger spill, make sure it's triggered before noMoreInput_ is set. ensureSortFits(); @@ -120,7 +123,7 @@ void SortBuffer::noMoreInput() { return; } - if (spiller_ == nullptr) { + if (inputSpiller_ == nullptr) { VELOX_CHECK_EQ(numInputRows_, data_->numRows()); updateEstimatedOutputRowSize(); // Sort the pointers to the rows in RowContainer (data_) instead of sorting @@ -160,7 +163,7 @@ RowVectorPtr SortBuffer::getOutput(vector_size_t maxOutputRows) { std::min(numInputRows_ - numOutputRows_, maxOutputRows); ensureOutputFits(batchSize); prepareOutput(batchSize); - if (spiller_ != nullptr) { + if (hasSpilled()) { getOutputWithSpill(); } else { getOutputWithoutSpill(); @@ -168,6 +171,14 @@ RowVectorPtr SortBuffer::getOutput(vector_size_t maxOutputRows) { return output_; } +bool SortBuffer::hasSpilled() const { + if (inputSpiller_ != nullptr) { + VELOX_CHECK_NULL(outputSpiller_); + return true; + } + return outputSpiller_ != nullptr; +} + void SortBuffer::spill() { VELOX_CHECK_NOT_NULL( spillConfig_, "spill config is null when SortBuffer spill is called"); @@ -263,7 +274,7 @@ void SortBuffer::ensureOutputFits(vector_size_t batchSize) { return; } - if (!estimatedOutputRowSize_.has_value() || spiller_ != nullptr) { + if (!estimatedOutputRowSize_.has_value() || hasSpilled()) { return; } @@ -294,7 +305,7 @@ void SortBuffer::ensureSortFits() { return; } - if (numInputRows_ == 0 || spiller_ != nullptr) { + if (numInputRows_ == 0 || inputSpiller_ != nullptr) { return; } @@ -333,10 +344,9 @@ void SortBuffer::updateEstimatedOutputRowSize() { } void SortBuffer::spillInput() { - if (spiller_ == nullptr) { + if (inputSpiller_ == nullptr) { VELOX_CHECK(!noMoreInput_); - spiller_ = std::make_unique( - Spiller::Type::kOrderByInput, + inputSpiller_ = std::make_unique( data_.get(), spillerStoreType_, data_->keyTypes().size(), @@ -344,12 +354,12 @@ void SortBuffer::spillInput() { spillConfig_, spillStats_); } - spiller_->spill(); + inputSpiller_->spill(); data_->clear(); } void SortBuffer::spillOutput() { - if (spiller_ != nullptr) { + if (hasSpilled()) { // Already spilled. return; } @@ -358,17 +368,13 @@ void SortBuffer::spillOutput() { return; } - spiller_ = std::make_unique( - Spiller::Type::kOrderByOutput, - data_.get(), - spillerStoreType_, - spillConfig_, - spillStats_); - auto spillRows = Spiller::SpillRows( + outputSpiller_ = std::make_unique( + data_.get(), spillerStoreType_, spillConfig_, spillStats_); + auto spillRows = SpillerBase::SpillRows( sortedRows_.begin() + numOutputRows_, sortedRows_.end(), *memory::spillMemoryPool()); - spiller_->spill(spillRows); + outputSpiller_->spill(spillRows); data_->clear(); sortedRows_.clear(); sortedRows_.shrink_to_fit(); @@ -391,7 +397,7 @@ void SortBuffer::prepareOutput(vector_size_t batchSize) { child->resize(batchSize); } - if (spiller_ != nullptr) { + if (hasSpilled()) { spillSources_.resize(batchSize); spillSourceRows_.resize(batchSize); prepareOutputWithSpill(); @@ -461,12 +467,24 @@ void SortBuffer::getOutputWithSpill() { void SortBuffer::finishSpill() { VELOX_CHECK_NULL(spillMerger_); VELOX_CHECK(spillPartitionSet_.empty()); - spiller_->finishSpill(spillPartitionSet_); + VELOX_CHECK_EQ( + !!(outputSpiller_ != nullptr) + !!(inputSpiller_ != nullptr), + 1, + "inputSpiller_ {}, outputSpiller_ {}", + inputSpiller_ == nullptr ? "set" : "null", + outputSpiller_ == nullptr ? "set" : "null"); + if (inputSpiller_ != nullptr) { + VELOX_CHECK(!inputSpiller_->finalized()); + inputSpiller_->finishSpill(spillPartitionSet_); + } else { + VELOX_CHECK(!outputSpiller_->finalized()); + outputSpiller_->finishSpill(spillPartitionSet_); + } VELOX_CHECK_EQ(spillPartitionSet_.size(), 1); } void SortBuffer::prepareOutputWithSpill() { - VELOX_CHECK_NOT_NULL(spiller_); + VELOX_CHECK(hasSpilled()); if (spillMerger_ != nullptr) { VELOX_CHECK(spillPartitionSet_.empty()); return; @@ -477,5 +495,4 @@ void SortBuffer::prepareOutputWithSpill() { spillConfig_->readBufferSize, pool(), spillStats_); spillPartitionSet_.clear(); } - } // namespace facebook::velox::exec diff --git a/velox/exec/SortBuffer.h b/velox/exec/SortBuffer.h index eafe29f9e16c..9804e2f85890 100644 --- a/velox/exec/SortBuffer.h +++ b/velox/exec/SortBuffer.h @@ -21,10 +21,11 @@ #include "velox/exec/OperatorUtils.h" #include "velox/exec/PrefixSort.h" #include "velox/exec/RowContainer.h" -#include "velox/exec/Spill.h" #include "velox/vector/BaseVector.h" namespace facebook::velox::exec { +class SortInputSpiller; +class SortOutputSpiller; /// A utility class to accumulate data inside and output the sorted result. /// Spilling would be triggered if spilling is enabled and memory usage exceeds @@ -71,38 +72,59 @@ class SortBuffer { private: // Ensures there is sufficient memory reserved to process 'input'. void ensureInputFits(const VectorPtr& input); + // Reserves memory for output processing. If reservation cannot be increased, // spills enough to make output fit. void ensureOutputFits(vector_size_t outputBatchSize); - // Reserves memory for sort. If reservation cannot be increased, - // spills enough to make output fit. + + // Reserves memory for sort. If reservation cannot be increased, spills enough + // to make output fit. void ensureSortFits(); + void updateEstimatedOutputRowSize(); + // Invoked to initialize or reset the reusable output buffer to get output. void prepareOutput(vector_size_t outputBatchSize); + // Invoked to initialize reader to read the spilled data from storage for // output processing. void prepareOutputWithSpill(); + void getOutputWithoutSpill(); + void getOutputWithSpill(); + // Spill during input stage. void spillInput(); + // Spill during output stage. void spillOutput(); + // Finish spill, and we shouldn't get any rows from non-spilled partition as // there is only one hash partition for SortBuffer. void finishSpill(); + // Returns true if the sort buffer has spilled, regardless of during input or + // output processing. If spilled() is true, it means the sort buffer is in + // minimal memory mode and could not be spilled further. + bool hasSpilled() const; + const RowTypePtr input_; + const std::vector sortCompareFlags_; + velox::memory::MemoryPool* const pool_; + // The flag is passed from the associated operator such as OrderBy or // TableWriter to indicate if this sort buffer object is under non-reclaimable // execution section or not. tsan_atomic* const nonReclaimableSection_; + // Configuration settings for prefix-sort. const common::PrefixSortConfig prefixSortConfig_; + const common::SpillConfig* const spillConfig_; + folly::Synchronized* const spillStats_; // The column projection map between 'input_' and 'spillerStoreType_' as sort @@ -112,30 +134,41 @@ class SortBuffer { // Indicates no more input. Once it is set, addInput() can't be called on this // sort buffer object. bool noMoreInput_ = false; + // The number of received input rows. uint64_t numInputRows_ = 0; + // Used to store the input data in row format. std::unique_ptr data_; + std::vector> sortedRows_; // The data type of the rows stored in 'data_' and spilled on disk. The // sort key columns are stored first then the non-sorted data columns. RowTypePtr spillerStoreType_; - std::unique_ptr spiller_; + + std::unique_ptr inputSpiller_; + + std::unique_ptr outputSpiller_; + SpillPartitionSet spillPartitionSet_; + // Used to merge the sorted runs from in-memory rows and spilled rows on disk. std::unique_ptr> spillMerger_; + // Records the source rows to copy to 'output_' in order. std::vector spillSources_; + std::vector spillSourceRows_; // Reusable output vector. RowVectorPtr output_; + // Estimated size of a single output row by using the max // 'data_->estimateRowSize()' across all accumulated data set. std::optional estimatedOutputRowSize_{}; + // The number of rows that has been returned. uint64_t numOutputRows_{0}; }; - } // namespace facebook::velox::exec diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index be08fb7109b6..927afefe9f3e 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -180,9 +180,7 @@ void SortWindowBuild::ensureSortFits() { void SortWindowBuild::setupSpiller() { VELOX_CHECK_NULL(spiller_); - spiller_ = std::make_unique( - // TODO Replace Spiller::Type::kOrderBy. - Spiller::Type::kOrderByInput, + spiller_ = std::make_unique( data_.get(), inputType_, compareFlags_.size(), @@ -201,6 +199,13 @@ void SortWindowBuild::spill() { data_->pool()->release(); } +std::optional SortWindowBuild::spilledStats() const { + if (spiller_ == nullptr) { + return std::nullopt; + } + return spiller_->stats(); +} + // Use double front and back search algorithm to find next partition start row. // It is more efficient than linear or binary search. // This algorithm is described at @@ -371,5 +376,4 @@ bool SortWindowBuild::hasNextPartition() { return partitionStartRows_.size() > 0 && currentPartition_ < static_cast(partitionStartRows_.size() - 2); } - } // namespace facebook::velox::exec diff --git a/velox/exec/SortWindowBuild.h b/velox/exec/SortWindowBuild.h index 45e7b3390cb1..72875094007a 100644 --- a/velox/exec/SortWindowBuild.h +++ b/velox/exec/SortWindowBuild.h @@ -21,7 +21,6 @@ #include "velox/exec/WindowBuild.h" namespace facebook::velox::exec { - // Sorts input data of the Window by {partition keys, sort keys} // to identify window partitions. This sort fully orders // rows as needed for window function computation. @@ -48,12 +47,7 @@ class SortWindowBuild : public WindowBuild { void spill() override; - std::optional spilledStats() const override { - if (spiller_ == nullptr) { - return std::nullopt; - } - return spiller_->stats(); - } + std::optional spilledStats() const override; void noMoreInput() override; @@ -123,10 +117,9 @@ class SortWindowBuild : public WindowBuild { vector_size_t currentPartition_ = -1; // Spiller for contents of the 'data_'. - std::unique_ptr spiller_; + std::unique_ptr spiller_; // Used to sort-merge spilled data. std::unique_ptr> merge_; }; - } // namespace facebook::velox::exec diff --git a/velox/exec/Spill.h b/velox/exec/Spill.h index a2266aa7315a..00dc3683f636 100644 --- a/velox/exec/Spill.h +++ b/velox/exec/Spill.h @@ -32,7 +32,7 @@ #include "velox/vector/VectorStream.h" namespace facebook::velox::exec { -// A source of sorted spilled RowVectors coming either from a file or memory. +/// A source of sorted spilled RowVectors coming either from a file or memory. class SpillMergeStream : public MergeStream { public: SpillMergeStream() = default; @@ -135,7 +135,7 @@ class SpillMergeStream : public MergeStream { SelectivityVector rows_; }; -// A source of spilled RowVectors coming from a file. +/// A source of spilled RowVectors coming from a file. class FileSpillMergeStream : public SpillMergeStream { public: static std::unique_ptr create( @@ -448,7 +448,7 @@ class SpillState { // the max spill bytes limit. common::UpdateAndCheckSpillLimitCB updateAndCheckSpillLimitCb_; - /// Prefix for spill files. + // Prefix for spill files. const std::string fileNamePrefix_; const int32_t maxPartitions_; const int32_t numSortKeys_; diff --git a/velox/exec/Spiller.cpp b/velox/exec/Spiller.cpp index 54ac93b5d3ae..4296e8afc96b 100644 --- a/velox/exec/Spiller.cpp +++ b/velox/exec/Spiller.cpp @@ -27,470 +27,137 @@ using facebook::velox::common::testutil::TestValue; namespace facebook::velox::exec { -namespace { -#define CHECK_NOT_FINALIZED() \ - VELOX_CHECK(!finalized_, "Spiller has been finalized") -#define CHECK_FINALIZED() \ - VELOX_CHECK(finalized_, "Spiller hasn't been finalized yet"); -} // namespace - -Spiller::Spiller( - Type type, +SpillerBase::SpillerBase( RowContainer* container, RowTypePtr rowType, + HashBitRange bits, int32_t numSortingKeys, const std::vector& sortCompareFlags, + uint64_t targetFileSize, + uint64_t maxSpillRunRows, const common::SpillConfig* spillConfig, folly::Synchronized* spillStats) - : Spiller( - type, - container, - std::move(rowType), - HashBitRange{}, - numSortingKeys, - sortCompareFlags, - false, + : container_(container), + executor_(spillConfig->executor), + bits_(bits), + rowType_(rowType), + maxSpillRunRows_(maxSpillRunRows), + spillStats_(spillStats), + state_( spillConfig->getSpillDirPathCb, spillConfig->updateAndCheckSpillLimitCb, spillConfig->fileNamePrefix, - std::numeric_limits::max(), - spillConfig->writeBufferSize, - spillConfig->compressionKind, - spillConfig->prefixSortConfig, - spillConfig->executor, - spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig, - spillStats) { - VELOX_CHECK_EQ( - type_, - Type::kOrderByInput, - "Unexpected spiller type: {}", - typeName(type_)); - VELOX_CHECK_EQ(state_.maxPartitions(), 1); -} - -Spiller::Spiller( - Type type, - RowContainer* container, - RowTypePtr rowType, - const HashBitRange& hashBitRange, - int32_t numSortingKeys, - const std::vector& sortCompareFlags, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats) - : Spiller( - type, - container, - std::move(rowType), - hashBitRange, + bits.numPartitions(), numSortingKeys, sortCompareFlags, - false, - spillConfig->getSpillDirPathCb, - spillConfig->updateAndCheckSpillLimitCb, - spillConfig->fileNamePrefix, - std::numeric_limits::max(), + targetFileSize, spillConfig->writeBufferSize, spillConfig->compressionKind, spillConfig->prefixSortConfig, - spillConfig->executor, - spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig, - spillStats) { - VELOX_CHECK( - type_ == Type::kOrderByInput || type_ == Type::kAggregateInput, - "Unexpected spiller type: {}", - typeName(type_)); - VELOX_CHECK_EQ(state_.targetFileSize(), std::numeric_limits::max()); -} + memory::spillMemoryPool(), + spillStats, + spillConfig->fileCreateConfig) { + TestValue::adjust("facebook::velox::exec::SpillerBase", this); -Spiller::Spiller( - Type type, - RowContainer* container, - RowTypePtr rowType, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats) - : Spiller( - type, - container, - std::move(rowType), - HashBitRange{}, - 0, - {}, - false, - spillConfig->getSpillDirPathCb, - spillConfig->updateAndCheckSpillLimitCb, - spillConfig->fileNamePrefix, - std::numeric_limits::max(), - spillConfig->writeBufferSize, - spillConfig->compressionKind, - spillConfig->prefixSortConfig, - spillConfig->executor, - spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig, - spillStats) { - VELOX_CHECK( - type_ == Type::kAggregateOutput || type_ == Type::kOrderByOutput, - "Unexpected spiller type: {}", - typeName(type_)); - VELOX_CHECK_EQ(state_.maxPartitions(), 1); - VELOX_CHECK_EQ(state_.targetFileSize(), std::numeric_limits::max()); + spillRuns_.reserve(state_.maxPartitions()); + for (int i = 0; i < state_.maxPartitions(); ++i) { + spillRuns_.emplace_back(*memory::spillMemoryPool()); + } } -Spiller::Spiller( - Type type, +NoRowContainerSpiller::NoRowContainerSpiller( RowTypePtr rowType, HashBitRange bits, const common::SpillConfig* spillConfig, folly::Synchronized* spillStats) - : Spiller( - type, + : SpillerBase( nullptr, std::move(rowType), bits, 0, {}, - false, - spillConfig->getSpillDirPathCb, - spillConfig->updateAndCheckSpillLimitCb, - spillConfig->fileNamePrefix, spillConfig->maxFileSize, - spillConfig->writeBufferSize, - spillConfig->compressionKind, - spillConfig->prefixSortConfig, - spillConfig->executor, 0, - spillConfig->fileCreateConfig, - spillStats) { - VELOX_CHECK_EQ( - type_, - Type::kHashJoinProbe, - "Unexpected spiller type: {}", - typeName(type_)); -} + spillConfig, + spillStats) {} -Spiller::Spiller( - Type type, - core::JoinType joinType, - RowContainer* container, - RowTypePtr rowType, - HashBitRange bits, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats) - : Spiller( - type, - container, - std::move(rowType), - bits, - 0, - {}, - needRightSideJoin(joinType), - spillConfig->getSpillDirPathCb, - spillConfig->updateAndCheckSpillLimitCb, - spillConfig->fileNamePrefix, - spillConfig->maxFileSize, - spillConfig->writeBufferSize, - spillConfig->compressionKind, - spillConfig->prefixSortConfig, - spillConfig->executor, - spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig, - spillStats) { - VELOX_CHECK_EQ(type_, Type::kHashJoinBuild); - VELOX_CHECK(isHashJoinTableSpillType(rowType_, joinType)); -} - -Spiller::Spiller( - Type type, - RowContainer* container, - RowTypePtr rowType, - HashBitRange bits, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats) - : Spiller( - type, - container, - std::move(rowType), - bits, - 0, - {}, - false, - spillConfig->getSpillDirPathCb, - spillConfig->updateAndCheckSpillLimitCb, - spillConfig->fileNamePrefix, - spillConfig->maxFileSize, - spillConfig->writeBufferSize, - spillConfig->compressionKind, - spillConfig->prefixSortConfig, - spillConfig->executor, - spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig, - spillStats) { - VELOX_CHECK_EQ(type_, Type::kRowNumber); -} - -Spiller::Spiller( - Type type, - RowContainer* container, - RowTypePtr rowType, - HashBitRange bits, - int32_t numSortingKeys, - const std::vector& sortCompareFlags, - bool recordProbedFlag, - const common::GetSpillDirectoryPathCB& getSpillDirPathCb, - const common::UpdateAndCheckSpillLimitCB& updateAndCheckSpillLimitCb, - const std::string& fileNamePrefix, - uint64_t targetFileSize, - uint64_t writeBufferSize, - common::CompressionKind compressionKind, - const std::optional& prefixSortConfig, - folly::Executor* executor, - uint64_t maxSpillRunRows, - const std::string& fileCreateConfig, - folly::Synchronized* spillStats) - : type_(type), - container_(container), - executor_(executor), - bits_(bits), - rowType_(std::move(rowType)), - spillProbedFlag_(recordProbedFlag), - maxSpillRunRows_(maxSpillRunRows), - spillStats_(spillStats), - state_( - getSpillDirPathCb, - updateAndCheckSpillLimitCb, - fileNamePrefix, - bits.numPartitions(), - numSortingKeys, - sortCompareFlags, - targetFileSize, - writeBufferSize, - compressionKind, - prefixSortConfig, - memory::spillMemoryPool(), - spillStats, - fileCreateConfig) { - TestValue::adjust("facebook::velox::exec::Spiller", this); +void SpillerBase::spill(const RowContainerIterator* startRowIter) { + VELOX_CHECK(!finalized_); - VELOX_CHECK(!spillProbedFlag_ || type_ == Type::kHashJoinBuild); - VELOX_CHECK_EQ(container_ == nullptr, type_ == Type::kHashJoinProbe); - spillRuns_.reserve(state_.maxPartitions()); - for (int i = 0; i < state_.maxPartitions(); ++i) { - spillRuns_.emplace_back(*memory::spillMemoryPool()); - } -} - -void Spiller::extractSpill(folly::Range rows, RowVectorPtr& resultPtr) { - if (resultPtr == nullptr) { - resultPtr = BaseVector::create( - rowType_, rows.size(), memory::spillMemoryPool()); - } else { - resultPtr->prepareForReuse(); - resultPtr->resize(rows.size()); - } - - auto* result = resultPtr.get(); - const auto& types = container_->columnTypes(); - for (auto i = 0; i < types.size(); ++i) { - container_->extractColumn(rows.data(), rows.size(), i, result->childAt(i)); - } - const auto& accumulators = container_->accumulators(); - column_index_t accumulatorColumnOffset = types.size(); - if (spillProbedFlag_) { - container_->extractProbedFlags( - rows.data(), rows.size(), false, false, result->childAt(types.size())); - ++accumulatorColumnOffset; - } - for (auto i = 0; i < accumulators.size(); ++i) { - accumulators[i].extractForSpill( - rows, result->childAt(i + accumulatorColumnOffset)); - } -} - -int64_t Spiller::extractSpillVector( - SpillRows& rows, - int32_t maxRows, - int64_t maxBytes, - RowVectorPtr& spillVector, - size_t& nextBatchIndex) { - VELOX_CHECK_NE(type_, Type::kHashJoinProbe); - uint64_t extractNs{0}; - auto limit = std::min(rows.size() - nextBatchIndex, maxRows); - VELOX_CHECK(!rows.empty()); - int32_t numRows = 0; - int64_t bytes = 0; - { - NanosecondTimer timer(&extractNs); - for (; numRows < limit; ++numRows) { - bytes += container_->rowSize(rows[nextBatchIndex + numRows]); - if (bytes > maxBytes) { - // Increment because the row that went over the limit is part - // of the result. We must spill at least one row. - ++numRows; - break; - } - } - extractSpill(folly::Range(&rows[nextBatchIndex], numRows), spillVector); - nextBatchIndex += numRows; - } - updateSpillExtractVectorTime(extractNs); - return bytes; -} - -namespace { -// A stream of ordered rows being read from the in memory -// container. This is the part of a spillable range that is not yet -// spilled when starting to produce output. This is only used for -// sorted spills since for hash join spilling we just use the data in -// the RowContainer as is. -class RowContainerSpillMergeStream : public SpillMergeStream { - public: - RowContainerSpillMergeStream( - int32_t numSortKeys, - const std::vector& sortCompareFlags, - Spiller::SpillRows&& rows, - Spiller& spiller) - : numSortKeys_(numSortKeys), - sortCompareFlags_(sortCompareFlags), - rows_(std::move(rows)), - spiller_(spiller) { - if (!rows_.empty()) { - RowContainerSpillMergeStream::nextBatch(); - } - } - - uint32_t id() const override { - // Returns the max uint32_t as the special id for in-memory spill merge - // stream. - return std::numeric_limits::max(); - } - - private: - int32_t numSortKeys() const override { - return numSortKeys_; - } - - const std::vector& sortCompareFlags() const override { - return sortCompareFlags_; - } + markAllPartitionsSpilled(); - void nextBatch() override { - // Extracts up to 64 rows at a time. Small batch size because may - // have wide data and no advantage in large size for narrow data - // since this is all processed row by row. - static constexpr vector_size_t kMaxRows = 64; - constexpr uint64_t kMaxBytes = 1 << 18; - if (nextBatchIndex_ >= rows_.size()) { - index_ = 0; - size_ = 0; - return; - } - spiller_.extractSpillVector( - rows_, kMaxRows, kMaxBytes, rowVector_, nextBatchIndex_); - size_ = rowVector_->size(); - index_ = 0; + RowContainerIterator rowIter; + if (startRowIter != nullptr) { + rowIter = *startRowIter; } - const int32_t numSortKeys_; - const std::vector sortCompareFlags_; - - Spiller::SpillRows rows_; - Spiller& spiller_; - size_t nextBatchIndex_ = 0; -}; -} // namespace - -std::unique_ptr Spiller::spillMergeStreamOverRows( - int32_t partition) { - CHECK_FINALIZED(); - VELOX_CHECK_LT(partition, state_.maxPartitions()); + bool lastRun{false}; + do { + lastRun = fillSpillRuns(&rowIter); + runSpill(lastRun); + } while (!lastRun); - if (!state_.isPartitionSpilled(partition)) { - return nullptr; - } - // Skip the merge stream from row container if it is empty. - if (spillRuns_[partition].rows.empty()) { - return nullptr; - } - ensureSorted(spillRuns_[partition]); - return std::make_unique( - container_->keyTypes().size(), - state_.sortCompareFlags(), - std::move(spillRuns_[partition].rows), - *this); + checkEmptySpillRuns(); } -void Spiller::ensureSorted(SpillRun& run) { - // The spill data of a hash join doesn't need to be sorted. - if (run.sorted || !needSort()) { - return; - } +bool SpillerBase::fillSpillRuns(RowContainerIterator* iterator) { + checkEmptySpillRuns(); - uint64_t sortTimeNs{0}; + bool lastRun{false}; + uint64_t execTimeNs{0}; { - NanosecondTimer timer(&sortTimeNs); + NanosecondTimer timer(&execTimeNs); - if (!state_.prefixSortConfig().has_value()) { - gfx::timsort( - run.rows.begin(), - run.rows.end(), - [&](const char* left, const char* right) { - return container_->compareRows( - left, right, state_.sortCompareFlags()) < 0; - }); - } else { - PrefixSort::sort( - container_, - state_.sortCompareFlags(), - state_.prefixSortConfig().value(), - memory::spillMemoryPool(), - run.rows); - } + // Number of rows to hash and divide into spill partitions at a time. + constexpr int32_t kHashBatchSize = 4096; + std::vector hashes(kHashBatchSize); + std::vector rows(kHashBatchSize); + const bool isSinglePartition = bits_.numPartitions() == 1; - run.sorted = true; - } + uint64_t totalRows{0}; + for (;;) { + const auto numRows = container_->listRows( + iterator, rows.size(), RowContainer::kUnlimited, rows.data()); + if (numRows == 0) { + lastRun = true; + break; + } - // NOTE: Always set a non-zero sort time to avoid flakiness in tests which - // check sort time. - updateSpillSortTime(std::max(1, sortTimeNs)); -} + // Calculate hashes for this batch of spill candidates. + auto rowSet = folly::Range(rows.data(), numRows); -std::unique_ptr Spiller::writeSpill(int32_t partition) { - VELOX_CHECK_NE(type_, Type::kHashJoinProbe); - // Target size of a single vector of spilled content. One of - // these will be materialized at a time for each stream of the - // merge. - constexpr int32_t kTargetBatchBytes = 1 << 18; // 256K - constexpr int32_t kTargetBatchRows = 64; + if (!isSinglePartition) { + for (auto i = 0; i < container_->keyTypes().size(); ++i) { + container_->hash(i, rowSet, i > 0, hashes.data()); + } + } - RowVectorPtr spillVector; - auto& run = spillRuns_[partition]; - try { - ensureSorted(run); - int64_t totalBytes = 0; - size_t written = 0; - while (written < run.rows.size()) { - extractSpillVector( - run.rows, kTargetBatchRows, kTargetBatchBytes, spillVector, written); - totalBytes += state_.appendToPartition(partition, spillVector); - if (totalBytes > state_.targetFileSize()) { - VELOX_CHECK(!needSort()); - state_.finishFile(partition); + // Put each in its run. + for (auto i = 0; i < numRows; ++i) { + // TODO: consider to cache the hash bits in row container so we only + // need to calculate them once. + const auto partition = isSinglePartition + ? 0 + : bits_.partition(hashes[i], state_.maxPartitions()); + VELOX_DCHECK_GE(partition, 0); + spillRuns_[partition].rows.push_back(rows[i]); + spillRuns_[partition].numBytes += container_->rowSize(rows[i]); + } + + totalRows += numRows; + if (maxSpillRunRows_ > 0 && totalRows >= maxSpillRunRows_) { + break; } } - return std::make_unique(partition, written, nullptr); - } catch (const std::exception&) { - // The exception is passed to the caller thread which checks this in - // advanceSpill(). - return std::make_unique( - partition, 0, std::current_exception()); } + updateSpillFillTime(execTimeNs); + + return lastRun; } -void Spiller::runSpill(bool lastRun) { +void SpillerBase::runSpill(bool lastRun) { ++spillStats_->wlock()->spillRuns; - VELOX_CHECK(type_ != Spiller::Type::kOrderByOutput || lastRun); std::vector>> writes; for (auto partition = 0; partition < spillRuns_.size(); ++partition) { @@ -538,88 +205,184 @@ void Spiller::runSpill(bool lastRun) { state_.finishFile(partition); } } +} - // For aggregation output / orderby output spiller, we expect only one spill - // call to spill all the rows starting from the specified row offset. - if (lastRun && - (type_ == Spiller::Type::kAggregateOutput || - type_ == Spiller::Type::kOrderByOutput)) { - for (auto partition = 0; partition < spillRuns_.size(); ++partition) { - state_.finishFile(partition); +std::unique_ptr SpillerBase::writeSpill( + int32_t partition) { + // Target size of a single vector of spilled content. One of + // these will be materialized at a time for each stream of the + // merge. + constexpr int32_t kTargetBatchBytes = 1 << 18; // 256K + constexpr int32_t kTargetBatchRows = 64; + + RowVectorPtr spillVector; + auto& run = spillRuns_[partition]; + try { + ensureSorted(run); + int64_t totalBytes = 0; + size_t written = 0; + while (written < run.rows.size()) { + extractSpillVector( + run.rows, kTargetBatchRows, kTargetBatchBytes, spillVector, written); + totalBytes += state_.appendToPartition(partition, spillVector); + if (totalBytes > state_.targetFileSize()) { + VELOX_CHECK(!needSort()); + state_.finishFile(partition); + } } + return std::make_unique(partition, written, nullptr); + } catch (const std::exception&) { + // The exception is passed to the caller thread which checks this in + // advanceSpill(). + return std::make_unique( + partition, 0, std::current_exception()); } } -void Spiller::updateSpillFillTime(uint64_t timeNs) { - spillStats_->wlock()->spillFillTimeNanos += timeNs; - common::updateGlobalSpillFillTime(timeNs); +void SpillerBase::ensureSorted(SpillRun& run) { + // The spill data of a hash join doesn't need to be sorted. + if (run.sorted || !needSort()) { + return; + } + + uint64_t sortTimeNs{0}; + { + NanosecondTimer timer(&sortTimeNs); + + if (!state_.prefixSortConfig().has_value()) { + gfx::timsort( + run.rows.begin(), + run.rows.end(), + [&](const char* left, const char* right) { + return container_->compareRows( + left, right, state_.sortCompareFlags()) < 0; + }); + } else { + PrefixSort::sort( + container_, + state_.sortCompareFlags(), + state_.prefixSortConfig().value(), + memory::spillMemoryPool(), + run.rows); + } + + run.sorted = true; + } + + // NOTE: Always set a non-zero sort time to avoid flakiness in tests which + // check sort time. + updateSpillSortTime(std::max(1, sortTimeNs)); } -void Spiller::updateSpillSortTime(uint64_t timeNs) { - spillStats_->wlock()->spillSortTimeNanos += timeNs; - common::updateGlobalSpillSortTime(timeNs); +int64_t SpillerBase::extractSpillVector( + SpillRows& rows, + int32_t maxRows, + int64_t maxBytes, + RowVectorPtr& spillVector, + size_t& nextBatchIndex) { + uint64_t extractNs{0}; + auto limit = std::min(rows.size() - nextBatchIndex, maxRows); + VELOX_CHECK(!rows.empty()); + int32_t numRows = 0; + int64_t bytes = 0; + { + NanosecondTimer timer(&extractNs); + for (; numRows < limit; ++numRows) { + bytes += container_->rowSize(rows[nextBatchIndex + numRows]); + if (bytes > maxBytes) { + // Increment because the row that went over the limit is part + // of the result. We must spill at least one row. + ++numRows; + break; + } + } + extractSpill(folly::Range(&rows[nextBatchIndex], numRows), spillVector); + nextBatchIndex += numRows; + } + updateSpillExtractVectorTime(extractNs); + return bytes; +} + +void SpillerBase::extractSpill( + folly::Range rows, + RowVectorPtr& resultPtr) { + if (resultPtr == nullptr) { + resultPtr = BaseVector::create( + rowType_, rows.size(), memory::spillMemoryPool()); + } else { + resultPtr->prepareForReuse(); + resultPtr->resize(rows.size()); + } + + auto* result = resultPtr.get(); + const auto& types = container_->columnTypes(); + for (auto i = 0; i < types.size(); ++i) { + container_->extractColumn(rows.data(), rows.size(), i, result->childAt(i)); + } + const auto& accumulators = container_->accumulators(); + column_index_t accumulatorColumnOffset = types.size(); + for (auto i = 0; i < accumulators.size(); ++i) { + accumulators[i].extractForSpill( + rows, result->childAt(i + accumulatorColumnOffset)); + } } -void Spiller::updateSpillExtractVectorTime(uint64_t timeNs) { +void SpillerBase::updateSpillExtractVectorTime(uint64_t timeNs) { spillStats_->wlock()->spillExtractVectorTimeNanos += timeNs; common::updateGlobalSpillExtractVectorTime(timeNs); } -bool Spiller::needSort() const { - return type_ != Type::kHashJoinProbe && type_ != Type::kHashJoinBuild && - type_ != Type::kRowNumber && type_ != Type::kAggregateOutput && - type_ != Type::kOrderByOutput; +void SpillerBase::updateSpillSortTime(uint64_t timeNs) { + spillStats_->wlock()->spillSortTimeNanos += timeNs; + common::updateGlobalSpillSortTime(timeNs); } -void Spiller::spill() { - return spill(nullptr); +void SpillerBase::checkEmptySpillRuns() const { + for (const auto& spillRun : spillRuns_) { + VELOX_CHECK(spillRun.rows.empty()); + } } -void Spiller::spill(const RowContainerIterator& startRowIter) { - VELOX_CHECK_EQ(type_, Type::kAggregateOutput); - return spill(&startRowIter); +void SpillerBase::updateSpillFillTime(uint64_t timeNs) { + spillStats_->wlock()->spillFillTimeNanos += timeNs; + common::updateGlobalSpillFillTime(timeNs); } -void Spiller::spill(const RowContainerIterator* startRowIter) { - CHECK_NOT_FINALIZED(); - VELOX_CHECK_NE(type_, Type::kHashJoinProbe); - VELOX_CHECK_NE(type_, Type::kOrderByOutput); - - markAllPartitionsSpilled(); +void SpillerBase::finishSpill(SpillPartitionSet& partitionSet) { + finalizeSpill(); - RowContainerIterator rowIter; - if (startRowIter != nullptr) { - rowIter = *startRowIter; + for (auto& partition : state_.spilledPartitionSet()) { + const SpillPartitionId partitionId(bits_.begin(), partition); + if (partitionSet.count(partitionId) == 0) { + partitionSet.emplace( + partitionId, + std::make_unique( + partitionId, state_.finish(partition))); + } else { + partitionSet[partitionId]->addFiles(state_.finish(partition)); + } } - - bool lastRun{false}; - do { - lastRun = fillSpillRuns(&rowIter); - runSpill(lastRun); - } while (!lastRun); - - checkEmptySpillRuns(); } -void Spiller::spill(SpillRows& rows) { - CHECK_NOT_FINALIZED(); - VELOX_CHECK_EQ(type_, Type::kOrderByOutput); - VELOX_CHECK(!rows.empty()); - - markAllPartitionsSpilled(); +common::SpillStats SpillerBase::stats() const { + return spillStats_->copy(); +} - fillSpillRun(rows); - runSpill(true); - checkEmptySpillRuns(); +std::string SpillerBase::toString() const { + return fmt::format( + "{}\t{}\tMAX_PARTITIONS:{}\tFINALIZED:{}", + type(), + rowType_->toString(), + state_.maxPartitions(), + finalized_); } -void Spiller::checkEmptySpillRuns() const { - for (const auto& spillRun : spillRuns_) { - VELOX_CHECK(spillRun.rows.empty()); - } +void SpillerBase::finalizeSpill() { + VELOX_CHECK(!finalized_); + finalized_ = true; } -void Spiller::markAllPartitionsSpilled() { +void SpillerBase::markAllPartitionsSpilled() { for (auto partition = 0; partition < state_.maxPartitions(); ++partition) { if (!state_.isPartitionSpilled(partition)) { state_.setPartitionSpilled(partition); @@ -627,13 +390,10 @@ void Spiller::markAllPartitionsSpilled() { } } -void Spiller::spill(uint32_t partition, const RowVectorPtr& spillVector) { - CHECK_NOT_FINALIZED(); - VELOX_CHECK( - type_ == Type::kHashJoinProbe || type_ == Type::kHashJoinBuild || - type_ == Type::kRowNumber, - "Unexpected spiller type: {}", - typeName(type_)); +void NoRowContainerSpiller::spill( + uint32_t partition, + const RowVectorPtr& spillVector) { + VELOX_CHECK(!finalized_); if (FOLLY_UNLIKELY(!state_.isPartitionSpilled(partition))) { VELOX_FAIL( "Can't spill vector to a non-spilling partition: {}, {}", @@ -649,83 +409,32 @@ void Spiller::spill(uint32_t partition, const RowVectorPtr& spillVector) { state_.appendToPartition(partition, spillVector); } -void Spiller::finishSpill(SpillPartitionSet& partitionSet) { - finalizeSpill(); - - for (auto& partition : state_.spilledPartitionSet()) { - const SpillPartitionId partitionId(bits_.begin(), partition); - if (partitionSet.count(partitionId) == 0) { - partitionSet.emplace( - partitionId, - std::make_unique( - partitionId, state_.finish(partition))); - } else { - partitionSet[partitionId]->addFiles(state_.finish(partition)); - } - } -} - -void Spiller::finalizeSpill() { - CHECK_NOT_FINALIZED(); - finalized_ = true; +void SortInputSpiller::spill() { + SpillerBase::spill(nullptr); } -bool Spiller::fillSpillRuns(RowContainerIterator* iterator) { - checkEmptySpillRuns(); - - bool lastRun{false}; - uint64_t execTimeNs{0}; - { - NanosecondTimer timer(&execTimeNs); - - // Number of rows to hash and divide into spill partitions at a time. - constexpr int32_t kHashBatchSize = 4096; - std::vector hashes(kHashBatchSize); - std::vector rows(kHashBatchSize); - const bool isSinglePartition = bits_.numPartitions() == 1; - - uint64_t totalRows{0}; - for (;;) { - const auto numRows = container_->listRows( - iterator, rows.size(), RowContainer::kUnlimited, rows.data()); - if (numRows == 0) { - lastRun = true; - break; - } - - // Calculate hashes for this batch of spill candidates. - auto rowSet = folly::Range(rows.data(), numRows); - - if (!isSinglePartition) { - for (auto i = 0; i < container_->keyTypes().size(); ++i) { - container_->hash(i, rowSet, i > 0, hashes.data()); - } - } - - // Put each in its run. - for (auto i = 0; i < numRows; ++i) { - // TODO: consider to cache the hash bits in row container so we only - // need to calculate them once. - const auto partition = isSinglePartition - ? 0 - : bits_.partition(hashes[i], state_.maxPartitions()); - VELOX_DCHECK_GE(partition, 0); - spillRuns_[partition].rows.push_back(rows[i]); - spillRuns_[partition].numBytes += container_->rowSize(rows[i]); - } +SortOutputSpiller::SortOutputSpiller( + RowContainer* container, + RowTypePtr rowType, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) + : SpillerBase( + container, + std::move(rowType), + HashBitRange{}, + 0, + {}, + std::numeric_limits::max(), + spillConfig->maxSpillRunRows, + spillConfig, + spillStats) {} - totalRows += numRows; - if (maxSpillRunRows_ > 0 && totalRows >= maxSpillRunRows_) { - break; - } - } - } - updateSpillFillTime(execTimeNs); +void SortOutputSpiller::spill(SpillRows& rows) { + VELOX_CHECK(!finalized_); + VELOX_CHECK(!rows.empty()); - return lastRun; -} + markAllPartitionsSpilled(); -void Spiller::fillSpillRun(SpillRows& rows) { VELOX_CHECK_EQ(bits_.numPartitions(), 1); checkEmptySpillRuns(); uint64_t execTimeNs{0}; @@ -738,40 +447,16 @@ void Spiller::fillSpillRun(SpillRows& rows) { } } updateSpillFillTime(execTimeNs); + runSpill(true); + checkEmptySpillRuns(); } -std::string Spiller::toString() const { - return fmt::format( - "{}\t{}\tMAX_PARTITIONS:{}\tFINALIZED:{}", - typeName(type_), - rowType_->toString(), - state_.maxPartitions(), - finalized_); -} - -// static -std::string Spiller::typeName(Type type) { - switch (type) { - case Type::kOrderByInput: - return "ORDER_BY_INPUT"; - case Type::kOrderByOutput: - return "ORDER_BY_OUTPUT"; - case Type::kHashJoinBuild: - return "HASH_JOIN_BUILD"; - case Type::kHashJoinProbe: - return "HASH_JOIN_PROBE"; - case Type::kAggregateInput: - return "AGGREGATE_INPUT"; - case Type::kAggregateOutput: - return "AGGREGATE_OUTPUT"; - case Type::kRowNumber: - return "ROW_NUMBER"; - default: - VELOX_UNREACHABLE("Unknown type: {}", static_cast(type)); +void SortOutputSpiller::runSpill(bool lastRun) { + SpillerBase::runSpill(lastRun); + if (lastRun) { + for (auto partition = 0; partition < spillRuns_.size(); ++partition) { + state_.finishFile(partition); + } } } - -common::SpillStats Spiller::stats() const { - return spillStats_->copy(); -} } // namespace facebook::velox::exec diff --git a/velox/exec/Spiller.h b/velox/exec/Spiller.h index 91a1352a205b..02965bd3e461 100644 --- a/velox/exec/Spiller.h +++ b/velox/exec/Spiller.h @@ -21,177 +21,26 @@ #include "velox/exec/RowContainer.h" namespace facebook::velox::exec { +namespace test { +class SpillerTest; +} -/// Manages spilling data from a RowContainer. -class Spiller { +class SpillerBase { public: - // Define the spiller types. - enum class Type : int8_t { - // Used for aggregation input processing stage. - kAggregateInput = 0, - // Used for aggregation output processing stage. - kAggregateOutput = 1, - // Used for hash join build. - kHashJoinBuild = 2, - // Used for hash join probe. - kHashJoinProbe = 3, - // Used for order by input processing stage. - kOrderByInput = 4, - // Used for order by output processing stage. - kOrderByOutput = 5, - // Used for row number. - kRowNumber = 6, - // Number of spiller types. - kNumTypes = 7, - }; - - static std::string typeName(Type); - using SpillRows = std::vector>; - /// The constructor without specifying hash bits which will only use one - /// partition by default. - - /// type == Type::kAggregateInput - Spiller( - Type type, - RowContainer* container, - RowTypePtr rowType, - const HashBitRange& hashBitRange, - int32_t numSortingKeys, - const std::vector& sortCompareFlags, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats); - - /// type == Type::kOrderByInput - Spiller( - Type type, - RowContainer* container, - RowTypePtr rowType, - int32_t numSortingKeys, - const std::vector& sortCompareFlags, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats); - - /// type == Type::kAggregateOutput || type == Type::kOrderByOutput - Spiller( - Type type, - RowContainer* container, - RowTypePtr rowType, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats); + virtual ~SpillerBase() = default; - /// type == Type::kHashJoinProbe - Spiller( - Type type, - RowTypePtr rowType, - HashBitRange bits, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats); - - /// type == Type::kHashJoinBuild - Spiller( - Type type, - core::JoinType joinType, - RowContainer* container, - RowTypePtr rowType, - HashBitRange bits, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats); - - /// type == Type::kRowNumber - Spiller( - Type type, - RowContainer* container, - RowTypePtr rowType, - HashBitRange bits, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats); - - Type type() const { - return type_; - } - - /// Spills all the rows from 'this' to disk. The spilled rows stays in the - /// row container. The caller needs to erase the spilled rows from the row - /// container. - void spill(); - - /// Spill all rows starting from 'startRowIter'. This is only used by - /// 'kAggregateOutput' spiller type to spill during the aggregation output - /// processing. Similarly, the spilled rows still stays in the row container. - /// The caller needs to erase them from the row container. - void spill(const RowContainerIterator& startRowIter); - - /// Invoked to spill all the rows pointed by rows. This is used by - /// 'kOrderByOutput' spiller type to spill during the order by - /// output processing. Similarly, the spilled rows still stays in the row - /// container. The caller needs to erase them from the row container. - void spill(SpillRows& rows); - - /// Append 'spillVector' into the spill file of given 'partition'. It is now - /// only used by the spilling operator which doesn't need data sort, such as - /// hash join build and hash join probe. - /// - /// NOTE: the spilling operator should first mark 'partition' as spilling and - /// spill any data buffered in row container before call this. - void spill(uint32_t partition, const RowVectorPtr& spillVector); - - /// Extracts up to 'maxRows' or 'maxBytes' from 'rows' into 'spillVector'. The - /// extract starts at nextBatchIndex and updates nextBatchIndex to be the - /// index of the first non-extracted element of 'rows'. Returns the byte size - /// of the extracted rows. - int64_t extractSpillVector( - SpillRows& rows, - int32_t maxRows, - int64_t maxBytes, - RowVectorPtr& spillVector, - size_t& nextBatchIndex); - - /// Finishes spilling and accumulate the spilled partition metadata in - /// 'partitionSet' indexed by spill partition id. void finishSpill(SpillPartitionSet& partitionSet); - const SpillState& state() const { - return state_; - } - const HashBitRange& hashBits() const { return bits_; } - bool isSpilled(int32_t partition) const { - return state_.isPartitionSpilled(partition); - } - - /// Indicates if all the partitions have spilled. - bool isAllSpilled() const { - return state_.isAllPartitionSpilled(); - } - - /// Indicates if any one of the partitions has spilled. - bool isAnySpilled() const { - return state_.isAnyPartitionSpilled(); - } - - /// Returns the spilled partition number set. - SpillPartitionNumSet spilledPartitionSet() const { - return state_.spilledPartitionSet(); - } - - /// Invokes to set a set of 'partitions' as spilling. - void setPartitionsSpilled(const SpillPartitionNumSet& partitions) { - VELOX_CHECK_EQ( - type_, - Spiller::Type::kHashJoinProbe, - "Unexpected spiller type: ", - typeName(type_)); - for (const auto& partition : partitions) { - state_.setPartitionSpilled(partition); - } + const SpillState& state() const { + return state_; } - /// Indicates if this spiller has finalized or not. bool finalized() const { return finalized_; } @@ -200,43 +49,41 @@ class Spiller { std::string toString() const; - private: - Spiller( - Type type, + protected: + SpillerBase( RowContainer* container, RowTypePtr rowType, HashBitRange bits, int32_t numSortingKeys, const std::vector& sortCompareFlags, - bool spillProbedFlag, - const common::GetSpillDirectoryPathCB& getSpillDirPathCb, - const common::UpdateAndCheckSpillLimitCB& updateAndCheckSpillLimitCb, - const std::string& fileNamePrefix, uint64_t targetFileSize, - uint64_t writeBufferSize, - common::CompressionKind compressionKind, - const std::optional& prefixSortConfig, - folly::Executor* executor, uint64_t maxSpillRunRows, - const std::string& fileCreateConfig, + const common::SpillConfig* spillConfig, folly::Synchronized* spillStats); // Invoked to spill. If 'startRowIter' is not null, then we only spill rows // from row container starting at the offset pointed by 'startRowIter'. void spill(const RowContainerIterator* startRowIter); + // Writes out all the rows collected in spillRuns_. + virtual void runSpill(bool lastRun); + // Extracts the keys, dependents or accumulators for 'rows' into '*result'. // Creates '*results' in spillPool() if nullptr. Used from Spiller and // RowContainerSpillMergeStream. - void extractSpill(folly::Range rows, RowVectorPtr& result); + virtual void extractSpill(folly::Range rows, RowVectorPtr& resultPtr); - // Returns a mergeable stream that goes over unspilled in-memory - // rows for the spill partition 'partition'. finishSpill() - // first and 'partition' must specify a partition that has started spilling. - std::unique_ptr spillMergeStreamOverRows(int32_t partition); + virtual bool needSort() const = 0; - // Invoked to finalize the spiller and flush any buffered spill to disk. - void finalizeSpill(); + virtual std::string type() const = 0; + + // Marks all the partitions have been spilled as we don't support + // fine-grained spilling as for now. + void markAllPartitionsSpilled(); + + void updateSpillFillTime(uint64_t timeNs); + + void checkEmptySpillRuns() const; // Represents a run of rows from a spillable partition of // a RowContainer. Rows that hash to the same partition are accumulated here @@ -284,11 +131,33 @@ class Spiller { : partition(_partition), rowsWritten(_numWritten), error(_error) {} }; - void checkEmptySpillRuns() const; + RowContainer* const container_{nullptr}; - // Marks all the partitions have been spilled as we don't support - // fine-grained spilling as for now. - void markAllPartitionsSpilled(); + folly::Executor* const executor_; + + const HashBitRange bits_; + + const RowTypePtr rowType_; + + const uint64_t maxSpillRunRows_; + + folly::Synchronized* const spillStats_; + + // True if all rows of spilling partitions are in 'spillRuns_', so + // that one can start reading these back. + bool finalized_{false}; + + SpillState state_; + + // Collects the rows to spill for each partition. + std::vector spillRuns_; + + private: + // Function for writing a spill partition on an executor. Writes to + // 'partition' until all rows in spillRuns_[partition] are written + // or spill file size limit is exceeded. Returns the number of rows + // written. + std::unique_ptr writeSpill(int32_t partition); // Prepares spill runs for the spillable data from all the hash partitions. // If 'startRowIter' is not null, we prepare runs starting from the offset @@ -296,62 +165,113 @@ class Spiller { // The function returns true if it is the last spill run. bool fillSpillRuns(RowContainerIterator* startRowIter = nullptr); - // Prepares spill run of a single partition for the spillable data from the - // rows. - void fillSpillRun(SpillRows& rows); + void updateSpillExtractVectorTime(uint64_t timeNs); - // Writes out all the rows collected in spillRuns_. - void runSpill(bool lastRun); + void updateSpillSortTime(uint64_t timeNs); // Sorts 'run' if not already sorted. void ensureSorted(SpillRun& run); - // Function for writing a spill partition on an executor. Writes to - // 'partition' until all rows in spillRuns_[partition] are written - // or spill file size limit is exceeded. Returns the number of rows - // written. - std::unique_ptr writeSpill(int32_t partition); + // Extracts up to 'maxRows' or 'maxBytes' from 'rows' into 'spillVector'. The + // extract starts at nextBatchIndex and updates nextBatchIndex to be the + // index of the first non-extracted element of 'rows'. Returns the byte size + // of the extracted rows. + int64_t extractSpillVector( + SpillRows& rows, + int32_t maxRows, + int64_t maxBytes, + RowVectorPtr& spillVector, + size_t& nextBatchIndex); - // Indicates if the spill data needs to be sorted before write to file. It is - // based on the spiller type. As for now, we need to sort spill data for any - // non hash join types of spilling. - bool needSort() const; + // Invoked to finalize the spiller and flush any buffered spill to disk. + void finalizeSpill(); - void updateSpillFillTime(uint64_t timeUs); + friend class test::SpillerTest; +}; - void updateSpillSortTime(uint64_t timeUs); +class NoRowContainerSpiller : public SpillerBase { + public: + static constexpr std::string_view kType = "NoRowContainerSpiller"; - void updateSpillExtractVectorTime(uint64_t timeUs); + NoRowContainerSpiller( + RowTypePtr rowType, + HashBitRange bits, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); - const Type type_; - // NOTE: for hash join probe type, there is no associated row container for - // the spiller. - RowContainer* const container_{nullptr}; - folly::Executor* const executor_; - const HashBitRange bits_; - const RowTypePtr rowType_; - const bool spillProbedFlag_; - const uint64_t maxSpillRunRows_; + void spill(uint32_t partition, const RowVectorPtr& spillVector); - folly::Synchronized* const spillStats_; + void setPartitionsSpilled(const SpillPartitionNumSet& partitions) { + for (const auto& partition : partitions) { + state_.setPartitionSpilled(partition); + } + } - // True if all rows of spilling partitions are in 'spillRuns_', so - // that one can start reading these back. This means that the rows - // that are not written out and deleted will be captured by - // spillMergeStreamOverRows(). - bool finalized_{false}; + private: + std::string type() const override { + return std::string(kType); + } - SpillState state_; + bool needSort() const override { + return false; + } +}; - // Collects the rows to spill for each partition. - std::vector spillRuns_; +class SortInputSpiller : public SpillerBase { + public: + static constexpr std::string_view kType = "SortInputSpiller"; + + SortInputSpiller( + RowContainer* container, + RowTypePtr rowType, + int32_t numSortingKeys, + const std::vector& sortCompareFlags, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) + : SpillerBase( + container, + std::move(rowType), + HashBitRange{}, + numSortingKeys, + sortCompareFlags, + std::numeric_limits::max(), + spillConfig->maxSpillRunRows, + spillConfig, + spillStats) {} + + void spill(); + + private: + std::string type() const override { + return std::string(kType); + } + + bool needSort() const override { + return true; + } }; -} // namespace facebook::velox::exec -template <> -struct fmt::formatter : formatter { - auto format(facebook::velox::exec::Spiller::Type s, format_context& ctx) - const { - return formatter::format(static_cast(s), ctx); +class SortOutputSpiller : public SpillerBase { + public: + static constexpr std::string_view kType = "SortOutputSpiller"; + + SortOutputSpiller( + RowContainer* container, + RowTypePtr rowType, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); + + void spill(SpillRows& rows); + + private: + void runSpill(bool lastRun) override; + + bool needSort() const override { + return false; + } + + std::string type() const override { + return std::string(kType); } }; +} // namespace facebook::velox::exec diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index f9766007c3ae..8d22ac3bb0b7 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -758,9 +758,7 @@ void TopNRowNumber::setupSpiller() { VELOX_CHECK_NULL(spiller_); VELOX_CHECK(spillConfig_.has_value()); - spiller_ = std::make_unique( - // TODO Replace Spiller::Type::kOrderBy. - Spiller::Type::kOrderByInput, + spiller_ = std::make_unique( data_.get(), inputType_, spillCompareFlags_.size(), diff --git a/velox/exec/TopNRowNumber.h b/velox/exec/TopNRowNumber.h index 099122e8b56d..a9163ab0086d 100644 --- a/velox/exec/TopNRowNumber.h +++ b/velox/exec/TopNRowNumber.h @@ -19,6 +19,7 @@ #include "velox/exec/Operator.h" namespace facebook::velox::exec { +class TopNRowNumberSpiller; /// Partitions the input using specified partitioning keys, sorts rows within /// partitions using specified sorting keys, assigns row numbers and returns up @@ -154,7 +155,9 @@ class TopNRowNumber : public Operator { bool abandonPartialEarly() const; const int32_t limit_; + const bool generateRowNumber_; + const size_t numPartitionKeys_; // Input columns in the order of: partition keys, sorting keys, the rest. @@ -171,6 +174,7 @@ class TopNRowNumber : public Operator { const std::vector spillCompareFlags_; const vector_size_t abandonPartialMinRows_; + const int32_t abandonPartialMinPct_; // True if this operator runs a 'partial' stage without sufficient reduction @@ -181,12 +185,15 @@ class TopNRowNumber : public Operator { // partitioning keys. For each partition, stores an instance of TopRows // struct. std::unique_ptr table_; + std::unique_ptr lookup_; + int32_t partitionOffset_; // TopRows struct to keep track of top rows for a single partition, when // there are no partitioning keys. std::unique_ptr allocator_; + std::unique_ptr singlePartition_; // Stores input data. For each partition, only up to 'limit_' rows are stored. @@ -211,6 +218,7 @@ class TopNRowNumber : public Operator { // Maximum number of rows in the output batch. vector_size_t outputBatchSize_; + std::vector outputRows_; // Number of partitions to fetch from a HashTable in a single listAllRows @@ -218,13 +226,17 @@ class TopNRowNumber : public Operator { static const size_t kPartitionBatchSize = 100; BaseHashTable::RowsIterator partitionIt_; + std::vector partitions_{kPartitionBatchSize}; + size_t numPartitions_{0}; + std::optional currentPartition_; + vector_size_t remainingRowsInPartition_{0}; // Spiller for contents of the 'data_'. - std::unique_ptr spiller_; + std::unique_ptr spiller_; // Used to sort-merge spilled data. std::unique_ptr> merge_; diff --git a/velox/exec/tests/AggregateSpillBenchmarkBase.cpp b/velox/exec/tests/AggregateSpillBenchmarkBase.cpp index 8464157a7e09..d5aa2ae94021 100644 --- a/velox/exec/tests/AggregateSpillBenchmarkBase.cpp +++ b/velox/exec/tests/AggregateSpillBenchmarkBase.cpp @@ -15,9 +15,9 @@ */ #include "velox/exec/tests/AggregateSpillBenchmarkBase.h" +#include "velox/exec/GroupingSet.h" #include -#include using namespace facebook::velox; using namespace facebook::velox::common; @@ -69,16 +69,22 @@ void AggregateSpillBenchmarkBase::setUp() { void AggregateSpillBenchmarkBase::run() { MicrosecondTimer timer(&executionTimeUs_); - if (spillerType_ == Spiller::Type::kAggregateInput) { - spiller_->spill(); + if (spillerType_ == std::string(AggregationInputSpiller::kType)) { + auto aggregationInputSpiller = + dynamic_cast(spiller_.get()); + VELOX_CHECK_NOT_NULL(aggregationInputSpiller); + aggregationInputSpiller->spill(); } else { - spiller_->spill(RowContainerIterator{}); + auto aggregationOutputSpiller = + dynamic_cast(spiller_.get()); + VELOX_CHECK_NOT_NULL(aggregationOutputSpiller); + aggregationOutputSpiller->spill(RowContainerIterator{}); } rowContainer_->clear(); } void AggregateSpillBenchmarkBase::printStats() const { - LOG(INFO) << "======Aggregate " << Spiller::typeName(spillerType_) + LOG(INFO) << "======Aggregate " << spillerType_ << " spilling statistics======"; LOG(INFO) << "total execution time: " << succinctMicros(executionTimeUs_); LOG(INFO) << numInputVectors_ << " vectors each with " << inputVectorSize_ @@ -123,7 +129,7 @@ void AggregateSpillBenchmarkBase::writeSpillData() { } } -std::unique_ptr AggregateSpillBenchmarkBase::makeSpiller() { +std::unique_ptr AggregateSpillBenchmarkBase::makeSpiller() { common::SpillConfig spillConfig; spillConfig.getSpillDirPathCb = [&]() -> std::string_view { return spillDir_; @@ -141,9 +147,8 @@ std::unique_ptr AggregateSpillBenchmarkBase::makeSpiller() { // The default value of QueryConfig spillNumPartitionBits. spillConfig.numPartitionBits = 3; - if (spillerType_ == Spiller::Type::kAggregateInput) { - return std::make_unique( - spillerType_, + if (spillerType_ == AggregationInputSpiller::kType) { + return std::make_unique( rowContainer_.get(), rowType_, HashBitRange{ @@ -155,13 +160,10 @@ std::unique_ptr AggregateSpillBenchmarkBase::makeSpiller() { &spillConfig, &spillStats_); } else { + VELOX_CHECK_EQ(spillerType_, AggregationOutputSpiller::kType); // TODO: Add config flag to control the max spill rows. - return std::make_unique( - spillerType_, - rowContainer_.get(), - rowType_, - &spillConfig, - &spillStats_); + return std::make_unique( + rowContainer_.get(), rowType_, &spillConfig, &spillStats_); } } } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/AggregateSpillBenchmarkBase.h b/velox/exec/tests/AggregateSpillBenchmarkBase.h index 15b3bb853d66..aafb67d009ec 100644 --- a/velox/exec/tests/AggregateSpillBenchmarkBase.h +++ b/velox/exec/tests/AggregateSpillBenchmarkBase.h @@ -19,7 +19,7 @@ namespace facebook::velox::exec::test { class AggregateSpillBenchmarkBase : public SpillerBenchmarkBase { public: - explicit AggregateSpillBenchmarkBase(Spiller::Type spillerType) + explicit AggregateSpillBenchmarkBase(std::string spillerType) : spillerType_(spillerType){}; /// Sets up the test. @@ -32,9 +32,9 @@ class AggregateSpillBenchmarkBase : public SpillerBenchmarkBase { private: void writeSpillData(); - std::unique_ptr makeSpiller(); + std::unique_ptr makeSpiller(); - const Spiller::Type spillerType_; + const std::string spillerType_; std::unique_ptr rowContainer_; std::shared_ptr spillerPool_; }; diff --git a/velox/exec/tests/JoinSpillInputBenchmarkBase.cpp b/velox/exec/tests/JoinSpillInputBenchmarkBase.cpp index 46d9d9582cb7..949532bc4a21 100644 --- a/velox/exec/tests/JoinSpillInputBenchmarkBase.cpp +++ b/velox/exec/tests/JoinSpillInputBenchmarkBase.cpp @@ -45,19 +45,19 @@ void JoinSpillInputBenchmarkBase::setUp() { spillConfig.maxSpillRunRows = 0; spillConfig.fileCreateConfig = {}; - spiller_ = std::make_unique( - exec::Spiller::Type::kHashJoinProbe, - rowType_, - HashBitRange{29, 29}, - &spillConfig, - &spillStats_); - spiller_->setPartitionsSpilled({0}); + spiller_ = std::make_unique( + rowType_, HashBitRange{29, 29}, &spillConfig, &spillStats_); + dynamic_cast(spiller_.get()) + ->setPartitionsSpilled({0}); } void JoinSpillInputBenchmarkBase::run() { MicrosecondTimer timer(&executionTimeUs_); + auto noRowContainerSpiller = + dynamic_cast(spiller_.get()); + VELOX_CHECK_NOT_NULL(noRowContainerSpiller); for (auto i = 0; i < numInputVectors_; ++i) { - spiller_->spill(0, rowVectors_[i % numSampleVectors]); + noRowContainerSpiller->spill(0, rowVectors_[i % numSampleVectors]); } } diff --git a/velox/exec/tests/JoinSpillInputBenchmarkBase.h b/velox/exec/tests/JoinSpillInputBenchmarkBase.h index 313080802672..e82979d0774e 100644 --- a/velox/exec/tests/JoinSpillInputBenchmarkBase.h +++ b/velox/exec/tests/JoinSpillInputBenchmarkBase.h @@ -17,7 +17,7 @@ #include "velox/exec/tests/SpillerBenchmarkBase.h" namespace facebook::velox::exec::test { -// This test measures the spill input overhead in spill join & probe. +/// This test measures the spill input overhead in spill join & probe. class JoinSpillInputBenchmarkBase : public SpillerBenchmarkBase { public: JoinSpillInputBenchmarkBase() = default; diff --git a/velox/exec/tests/SpillerAggregateBenchmarkTest.cpp b/velox/exec/tests/SpillerAggregateBenchmarkTest.cpp index 7a8fc77a6c45..aa676554e2df 100644 --- a/velox/exec/tests/SpillerAggregateBenchmarkTest.cpp +++ b/velox/exec/tests/SpillerAggregateBenchmarkTest.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "velox/exec/GroupingSet.h" #include "velox/exec/tests/AggregateSpillBenchmarkBase.h" #include "velox/serializers/PrestoSerializer.h" @@ -28,22 +29,13 @@ int main(int argc, char* argv[]) { serializer::presto::PrestoVectorSerde::registerVectorSerde(); filesystems::registerLocalFileSystem(); - auto spillerTypeName = FLAGS_spiller_benchmark_spiller_type; - std::transform( - spillerTypeName.begin(), - spillerTypeName.end(), - spillerTypeName.begin(), - [](unsigned char c) { return std::toupper(c); }); - Spiller::Type spillerType; - if (spillerTypeName == Spiller::typeName(Spiller::Type::kAggregateInput)) { - spillerType = Spiller::Type::kAggregateInput; - } else if ( - spillerTypeName == Spiller::typeName(Spiller::Type::kAggregateOutput)) { - spillerType = Spiller::Type::kAggregateOutput; - } else { + auto spillerType = FLAGS_spiller_benchmark_spiller_type; + if (spillerType != AggregationInputSpiller::kType && + spillerType != AggregationOutputSpiller::kType) { VELOX_UNSUPPORTED( - "The spiller type {} is not one of [AGGREGATE_INPUT, AGGREGATE_OUTPUT], the aggregate spiller dose not support it.", - spillerTypeName); + "The spiller type {} is not one of [AggregationInputSpiller, " + "AggregationOutputSpiller], the aggregate spiller dose not support it.", + spillerType); } auto test = std::make_unique(spillerType); test->setUp(); diff --git a/velox/exec/tests/SpillerBenchmarkBase.cpp b/velox/exec/tests/SpillerBenchmarkBase.cpp index ba3617517a4f..ab94f79c62e3 100644 --- a/velox/exec/tests/SpillerBenchmarkBase.cpp +++ b/velox/exec/tests/SpillerBenchmarkBase.cpp @@ -44,7 +44,7 @@ DEFINE_string( "The compression kind to compress spill rows before write to disk"); DEFINE_string( spiller_benchmark_spiller_type, - "AGGREGATE_INPUT", + "AggregationInputSpiller", "The spiller type name."); DEFINE_uint32( spiller_benchmark_num_spill_vectors, diff --git a/velox/exec/tests/SpillerBenchmarkBase.h b/velox/exec/tests/SpillerBenchmarkBase.h index 2fd181cc015a..a4a4fcc228ea 100644 --- a/velox/exec/tests/SpillerBenchmarkBase.h +++ b/velox/exec/tests/SpillerBenchmarkBase.h @@ -37,7 +37,7 @@ DECLARE_uint64(spiller_benchmark_min_spill_run_size); DECLARE_uint64(spiller_benchmark_write_buffer_size); namespace facebook::velox::exec::test { -// This test measures the spill input overhead in spill join & probe. +/// This test measures the spill input overhead in spill join & probe. class SpillerBenchmarkBase { public: SpillerBenchmarkBase() = default; @@ -68,7 +68,7 @@ class SpillerBenchmarkBase { std::shared_ptr tempDir_; std::string spillDir_; std::shared_ptr fs_; - std::unique_ptr spiller_; + std::unique_ptr spiller_; // Stats. uint64_t executionTimeUs_{0}; folly::Synchronized spillStats_; diff --git a/velox/exec/tests/SpillerTest.cpp b/velox/exec/tests/SpillerTest.cpp index 83d6d7983e86..00ebb9e952bd 100644 --- a/velox/exec/tests/SpillerTest.cpp +++ b/velox/exec/tests/SpillerTest.cpp @@ -21,10 +21,13 @@ #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/tests/FaultyFileSystem.h" #include "velox/common/testutil/TestValue.h" +#include "velox/exec/GroupingSet.h" +#include "velox/exec/HashBuild.h" #include "velox/exec/HashJoinBridge.h" #include "velox/exec/HashPartitionFunction.h" #include "velox/exec/OperatorUtils.h" #include "velox/exec/RowContainer.h" +#include "velox/exec/RowNumber.h" #include "velox/exec/tests/utils/RowContainerTestBase.h" #include "velox/vector/fuzzer/VectorFuzzer.h" @@ -34,7 +37,19 @@ using namespace facebook::velox::exec::test; using namespace facebook::velox::common::testutil; using facebook::velox::filesystems::FileSystem; +namespace facebook::velox::exec::test { namespace { +enum class SpillerType { + NO_ROW_CONTAINER = 0, + SORT_INPUT = 1, + SORT_OUTPUT = 2, + HASH_BUILD = 3, + AGGREGATION_INPUT = 4, + AGGREGATION_OUTPUT = 5, + ROW_NUMBER_HASH_TABLE = 6, + NUM_TYPES = 7, +}; + // Class to write runtime stats in the tests to the stats container. class TestRuntimeStatWriter : public BaseRuntimeStatWriter { public: @@ -51,8 +66,29 @@ class TestRuntimeStatWriter : public BaseRuntimeStatWriter { std::unordered_map& stats_; }; +std::string typeName(SpillerType type) { + switch (type) { + case SpillerType::NO_ROW_CONTAINER: + return std::string(NoRowContainerSpiller::kType); + case SpillerType::SORT_INPUT: + return std::string(SortInputSpiller::kType); + case SpillerType::SORT_OUTPUT: + return std::string(SortOutputSpiller::kType); + case SpillerType::HASH_BUILD: + return std::string(HashBuildSpiller::kType); + case SpillerType::AGGREGATION_INPUT: + return std::string(AggregationInputSpiller::kType); + case SpillerType::AGGREGATION_OUTPUT: + return std::string(AggregationOutputSpiller::kType); + case SpillerType::ROW_NUMBER_HASH_TABLE: + return std::string(RowNumberHashTableSpiller::kType); + default: + VELOX_FAIL("UNKNOWN SpillerType"); + } +} + struct TestParam { - Spiller::Type type; + SpillerType type; // Specifies the spill executor pool size. If the size is zero, then spill // write path is executed inline with spiller control code path. int poolSize; @@ -61,7 +97,7 @@ struct TestParam { core::JoinType joinType; TestParam( - Spiller::Type _type, + SpillerType _type, int _poolSize, common::CompressionKind _compressionKind, bool _enablePrefixSort, @@ -75,7 +111,7 @@ struct TestParam { std::string toString() const { return fmt::format( "{}|{}|{}|{}", - Spiller::typeName(type), + typeName(type), poolSize, compressionKindToString(compressionKind), std::to_string(enablePrefixSort), @@ -86,9 +122,9 @@ struct TestParam { struct TestParamsBuilder { std::vector getTestParams() { std::vector params; - const auto numSpillerTypes = static_cast(Spiller::Type::kNumTypes); + const auto numSpillerTypes = static_cast(SpillerType::NUM_TYPES); for (int i = 0; i < numSpillerTypes; ++i) { - const auto type = static_cast(i); + const auto type = static_cast(i); if (typesToExclude.find(type) == typesToExclude.end()) { common::CompressionKind compressionKind = static_cast(numSpillerTypes % 6); @@ -99,7 +135,7 @@ struct TestParamsBuilder { compressionKind, poolSize % 2, core::JoinType::kRight); - if (type == Spiller::Type::kHashJoinBuild) { + if (type == SpillerType::HASH_BUILD) { params.emplace_back( type, poolSize, @@ -113,7 +149,7 @@ struct TestParamsBuilder { return params; } - std::unordered_set typesToExclude{}; + std::unordered_set typesToExclude{}; }; // Set sequential value in a given child vector. 'value' is the starting value. @@ -138,6 +174,64 @@ void resizeVector(RowVector& vector, vector_size_t size) { class SpillerTest : public exec::test::RowContainerTestBase { public: + template + T castOrThrow(SpillerBase* spiller) const { + if (auto* casted = dynamic_cast(spiller)) { + return casted; + } + VELOX_UNREACHABLE("Unsuccessful cast of spiller: {}", spiller->toString()); + } + + // Delegate to base spiller protected spill method. + void spill(SpillerBase* spiller) const { + spiller->spill(nullptr); + } + + struct SpillParams { + std::optional partition; + std::optional spillVector; + std::optional spillRows; + std::optional rowIter; + }; + + // Delegate to spiller implementation public spill method. + void spill(SpillerBase* spiller, SpillParams params) const { + const auto type = spiller->type(); + if (type == std::string(NoRowContainerSpiller::kType)) { + auto* spillerImpl = castOrThrow(spiller); + ASSERT_TRUE(params.partition.has_value()); + ASSERT_TRUE(params.spillVector.has_value()); + spillerImpl->spill(params.partition.value(), params.spillVector.value()); + } else if (type == std::string(SortInputSpiller::kType)) { + auto* spillerImpl = castOrThrow(spiller); + spillerImpl->spill(); + } else if (type == std::string(SortOutputSpiller::kType)) { + auto* spillerImpl = castOrThrow(spiller); + ASSERT_TRUE(params.spillRows.has_value()); + spillerImpl->spill(*params.spillRows.value()); + } else if (type == std::string(HashBuildSpiller::kType)) { + auto* spillerImpl = castOrThrow(spiller); + if (params.partition.has_value() && params.spillVector.has_value()) { + spillerImpl->spill( + params.partition.value(), params.spillVector.value()); + } else { + spillerImpl->spill(); + } + } else if (type == std::string(AggregationInputSpiller::kType)) { + auto* spillerImpl = castOrThrow(spiller); + spillerImpl->spill(); + } else if (type == std::string(AggregationOutputSpiller::kType)) { + auto* spillerImpl = castOrThrow(spiller); + ASSERT_TRUE(params.rowIter.has_value()); + spillerImpl->spill(*params.rowIter.value()); + } else if (type == std::string(RowNumberHashTableSpiller::kType)) { + auto* spillerImpl = castOrThrow(spiller); + spillerImpl->spill(); + } else { + VELOX_UNREACHABLE("Unknown spiller: {}", spiller->toString()); + } + } + static void SetUpTestCase() { TestValue::enable(); memory::MemoryManager::testingSetInstance({}); @@ -152,13 +246,12 @@ class SpillerTest : public exec::test::RowContainerTestBase { enablePrefixSort_(param.enablePrefixSort), joinType_(param.joinType), spillProbedFlag_( - type_ == Spiller::Type::kHashJoinBuild && - needRightSideJoin(joinType_)), + type_ == SpillerType::HASH_BUILD && needRightSideJoin(joinType_)), hashBits_( 0, - (type_ == Spiller::Type::kOrderByInput || - type_ == Spiller::Type::kOrderByOutput || - type_ == Spiller::Type::kAggregateOutput) + (type_ == SpillerType::SORT_INPUT || + type_ == SpillerType::SORT_OUTPUT || + type_ == SpillerType::AGGREGATION_OUTPUT) ? 0 : 2), numPartitions_(hashBits_.numPartitions()), @@ -197,7 +290,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { MAP(BIGINT(), ROW({{"s2_int", INTEGER()}, {"s2_string", VARCHAR()}})))}, }); - if (type_ == Spiller::Type::kHashJoinBuild) { + if (type_ == SpillerType::HASH_BUILD) { rowType_ = hashJoinTableSpillType(containerType_, joinType_); } else { rowType_ = containerType_; @@ -226,6 +319,11 @@ class SpillerTest : public exec::test::RowContainerTestBase { spillPartitionNums.begin(), spillPartitionNums.end()); } + bool needSort(SpillerType type) { + return type == SpillerType::AGGREGATION_INPUT || + type == SpillerType::SORT_INPUT; + } + void testSortedSpill( int numDuplicates, int32_t outputBatchSize = 0, @@ -234,7 +332,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { uint64_t readBufferSize = 1 << 20) { SCOPED_TRACE(fmt::format( "spillType: {} numDuplicates: {} outputBatchSize: {} ascending: {} makeError: {}", - Spiller::typeName(type_), + typeName(type_), numDuplicates, outputBatchSize, ascending, @@ -251,14 +349,14 @@ class SpillerTest : public exec::test::RowContainerTestBase { setupSpiller(2'000'000, 0, makeError, 0, readBufferSize); // We spill spillPct% of the data in 10% increments. - runSpill(makeError); + runSortedSpill(makeError); if (makeError) { return; } // Verify the spilled file exist on file system. auto stats = spiller_->stats(); const auto numSpilledFiles = stats.spilledFiles; - if (type_ == Spiller::Type::kAggregateOutput) { + if (type_ == SpillerType::AGGREGATION_OUTPUT) { ASSERT_EQ(numSpilledFiles, 1); } else { ASSERT_GT(numSpilledFiles, 0); @@ -272,12 +370,9 @@ class SpillerTest : public exec::test::RowContainerTestBase { ASSERT_NE(readFile.get(), nullptr); totalSpilledBytes += readFile->size(); } - ASSERT_TRUE(spiller_->isAnySpilled()); - ASSERT_TRUE(spiller_->isAllSpilled()); + ASSERT_TRUE(spiller_->state().isAnyPartitionSpilled()); + ASSERT_TRUE(spiller_->state().isAllPartitionSpilled()); ASSERT_FALSE(spiller_->finalized()); - VELOX_ASSERT_THROW(spiller_->spill(0, nullptr), "Unexpected spiller type"); - VELOX_ASSERT_THROW( - spiller_->setPartitionsSpilled({}), "Unexpected spiller type"); SpillPartitionSet spillPartitionSet; spiller_->finishSpill(spillPartitionSet); ASSERT_EQ(spillPartitionSet.size(), numPartitions_); @@ -289,10 +384,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { // Assert we can't call any spill function after the spiller has been // finalized. - VELOX_ASSERT_THROW(spiller_->spill(), "Spiller has been finalize"); - VELOX_ASSERT_THROW( - spiller_->spill(0, nullptr), "Spiller has been finalize"); - VELOX_ASSERT_THROW(spiller_->spill(RowContainerIterator{}), ""); + VELOX_ASSERT_THROW(spiller_->spill(nullptr), ""); verifySortedSpillData(spillPartitionSet, outputBatchSize); @@ -304,7 +396,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { ASSERT_EQ(stats.spilledBytes, totalSpilledBytes); ASSERT_EQ(stats.spillReadBytes, totalSpilledBytes); ASSERT_GT(stats.spillWriteTimeNanos, 0); - if (type_ == Spiller::Type::kAggregateOutput) { + if (type_ == SpillerType::AGGREGATION_OUTPUT) { ASSERT_EQ(stats.spillSortTimeNanos, 0); } else { ASSERT_GT(stats.spillSortTimeNanos, 0); @@ -401,7 +493,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { // spilling will be tested separately. rowContainer_ = makeRowContainer(keys, dependents, false); - if (numRows == 0 || type_ == Spiller::Type::kHashJoinProbe) { + if (numRows == 0 || type_ == SpillerType::NO_ROW_CONTAINER) { return; } const SelectivityVector allRows(numRows); @@ -466,45 +558,6 @@ class SpillerTest : public exec::test::RowContainerTestBase { } } - void setupSpillContainer(const RowTypePtr& rowType, int32_t numKeys) { - const auto& childTypes = rowType->children(); - std::vector keys(childTypes.begin(), childTypes.begin() + numKeys); - std::vector dependents; - if (numKeys < childTypes.size()) { - dependents.insert( - dependents.end(), childTypes.begin() + numKeys, childTypes.end()); - } - rowContainer_ = makeRowContainer(keys, dependents, false); - rowType_ = rowType; - } - - void writeSpillData(const std::vector& batches) { - vector_size_t numRows = 0; - for (const auto& batch : batches) { - numRows += batch->size(); - } - if (rowVector_ == nullptr) { - rowVector_ = - BaseVector::create(rowType_, numRows, pool_.get()); - } - rows_.resize(numRows); - for (int i = 0; i < numRows; ++i) { - rows_[i] = rowContainer_->newRow(); - } - - vector_size_t nextRow = 0; - for (const auto& batch : batches) { - rowVector_->append(batch.get()); - const SelectivityVector allRows(batch->size()); - for (int index = 0; index < batch->size(); ++index, ++nextRow) { - for (int i = 0; i < rowType_->size(); ++i) { - DecodedVector decodedVector(*batch->childAt(i), allRows); - rowContainer_->store(decodedVector, index, rows_[nextRow], i); - } - } - } - } - void sortSpillData(bool ascending = true) { partitions_.clear(); const auto numRows = rows_.size(); @@ -532,8 +585,8 @@ class SpillerTest : public exec::test::RowContainerTestBase { // NOTE: for aggregation output type, we expect the merge read to produce // the output rows in the same order of the row insertion. So do need the // sort for testing. - if (type_ == Spiller::Type::kAggregateOutput || - type_ == Spiller::Type::kOrderByOutput) { + if (type_ == SpillerType::AGGREGATION_OUTPUT || + type_ == SpillerType::SORT_OUTPUT) { return; } for (auto& partition : partitions_) { @@ -578,68 +631,63 @@ class SpillerTest : public exec::test::RowContainerTestBase { spillConfig_.maxFileSize = targetFileSize; spillConfig_.fileCreateConfig = {}; - if (type_ == Spiller::Type::kHashJoinProbe) { - // kHashJoinProbe doesn't have associated row container. - spiller_ = std::make_unique( - type_, rowType_, hashBits_, &spillConfig_, &spillStats_); - } else if (type_ == Spiller::Type::kAggregateInput) { - spiller_ = std::make_unique( - type_, + if (type_ == SpillerType::NO_ROW_CONTAINER) { + spiller_ = std::make_unique( + rowType_, hashBits_, &spillConfig_, &spillStats_); + } else if (type_ == SpillerType::SORT_INPUT) { + spiller_ = std::make_unique( rowContainer_.get(), rowType_, - hashBits_, rowContainer_->keyTypes().size(), compareFlags_, &spillConfig_, &spillStats_); - } else if (type_ == Spiller::Type::kOrderByInput) { - // We spill 'data' in one partition in type of kOrderBy. - spiller_ = std::make_unique( - type_, + } else if (type_ == SpillerType::SORT_OUTPUT) { + spiller_ = std::make_unique( + rowContainer_.get(), rowType_, &spillConfig_, &spillStats_); + } else if (type_ == SpillerType::HASH_BUILD) { + spiller_ = std::make_unique( + joinType_, rowContainer_.get(), rowType_, - rowContainer_->keyTypes().size(), - compareFlags_, + hashBits_, &spillConfig_, &spillStats_); - } else if ( - type_ == Spiller::Type::kAggregateOutput || - type_ == Spiller::Type::kOrderByOutput) { - spiller_ = std::make_unique( - type_, rowContainer_.get(), rowType_, &spillConfig_, &spillStats_); - } else if (type_ == Spiller::Type::kRowNumber) { - spiller_ = std::make_unique( - type_, + } else if (type_ == SpillerType::AGGREGATION_INPUT) { + spiller_ = std::make_unique( rowContainer_.get(), rowType_, hashBits_, + rowContainer_->keyTypes().size(), + compareFlags_, &spillConfig_, &spillStats_); - } else { - VELOX_CHECK_EQ(type_, Spiller::Type::kHashJoinBuild); - spiller_ = std::make_unique( - type_, - joinType_, + } else if (type_ == SpillerType::AGGREGATION_OUTPUT) { + spiller_ = std::make_unique( + rowContainer_.get(), rowType_, &spillConfig_, &spillStats_); + } else if (type_ == SpillerType::ROW_NUMBER_HASH_TABLE) { + spiller_ = std::make_unique( rowContainer_.get(), rowType_, hashBits_, &spillConfig_, &spillStats_); + } else { + VELOX_UNREACHABLE("Unknown spiller type"); } + ASSERT_EQ(spiller_->state().maxPartitions(), numPartitions_); - ASSERT_FALSE(spiller_->isAllSpilled()); - ASSERT_FALSE(spiller_->isAnySpilled()); + ASSERT_FALSE(spiller_->state().isAllPartitionSpilled()); + ASSERT_FALSE(spiller_->state().isAnyPartitionSpilled()); ASSERT_EQ(spiller_->hashBits(), hashBits_); } - void runSpill(bool expectedError) { + void runSortedSpill(bool expectedError) { + ASSERT_TRUE(spiller_->needSort()); try { - if (type_ != Spiller::Type::kAggregateOutput) { - spiller_->spill(); - } else { - RowContainerIterator iter; - spiller_->spill(iter); - } + spill( + spiller_.get(), + {std::nullopt, std::nullopt, std::nullopt, std::optional(nullptr)}); rowContainer_->clear(); ASSERT_FALSE(expectedError); } catch (const std::exception&) { @@ -651,8 +699,8 @@ class SpillerTest : public exec::test::RowContainerTestBase { SpillPartitionSet& spillPartitionSet, int32_t outputBatchSize = 0) { for (auto& spillPartitionEntry : spillPartitionSet) { - ASSERT_TRUE( - spiller_->isSpilled(spillPartitionEntry.first.partitionNumber())); + ASSERT_TRUE(spiller_->state().isPartitionSpilled( + spillPartitionEntry.first.partitionNumber())); const auto partition = spillPartitionEntry.first.partitionNumber(); auto* spillPartition = spillPartitionEntry.second.get(); // We make a merge reader that merges the spill files and the rows that @@ -792,12 +840,9 @@ class SpillerTest : public exec::test::RowContainerTestBase { int targetFileSize, uint64_t maxSpillRunRows, uint64_t readBufferSize = 1 << 20) { - ASSERT_TRUE( - type_ == Spiller::Type::kHashJoinBuild || - type_ == Spiller::Type::kHashJoinProbe || - type_ == Spiller::Type::kRowNumber); + ASSERT_FALSE(needSort(type_)); - const int numSpillPartitions = type_ != Spiller::Type::kHashJoinProbe + const int numSpillPartitions = (type_ != SpillerType::NO_ROW_CONTAINER) ? numPartitions_ : 1 + folly::Random().rand32() % numPartitions_; SpillPartitionNumSet spillPartitionNumSet; @@ -819,53 +864,61 @@ class SpillerTest : public exec::test::RowContainerTestBase { HashPartitionFunction spillHashFunction(hashBits_, rowType_, keyChannels_); // Setup a number of spillers to spill data and then accumulate results from // them by partition. - std::vector> spillers; + std::vector> spillers; for (int iter = 0; iter < numSpillers; ++iter) { const auto prevGStats = common::globalSpillStats(); setupSpillData( numKeys_, - type_ != Spiller::Type::kHashJoinProbe ? numBatchRows * 10 : 0, + (type_ != SpillerType::NO_ROW_CONTAINER) ? numBatchRows * 10 : 0, 1, nullptr, {}); setupSpiller(targetFileSize, 0, false, maxSpillRunRows, readBufferSize); // Can't append without marking a partition as spilling. - VELOX_ASSERT_THROW(spiller_->spill(0, rowVector_), ""); + if (auto* noRowContainerSpiller = + dynamic_cast(spiller_.get())) { + VELOX_ASSERT_THROW(noRowContainerSpiller->spill(0, rowVector_), ""); + } else if ( + auto* hashBuildSpiller = + dynamic_cast(spiller_.get())) { + VELOX_ASSERT_THROW(hashBuildSpiller->spill(0, rowVector_), ""); + } splitByPartition(rowVector_, spillHashFunction, inputsByPartition); - if (type_ == Spiller::Type::kHashJoinProbe) { - spiller_->setPartitionsSpilled(spillPartitionNumSet); + if (auto* spiller = + dynamic_cast(spiller_.get())) { + spiller->setPartitionsSpilled(spillPartitionNumSet); #ifndef NDEBUG VELOX_ASSERT_THROW( - spiller_->setPartitionsSpilled(spillPartitionNumSet), ""); + spiller->setPartitionsSpilled(spillPartitionNumSet), ""); #endif } else { - VELOX_ASSERT_THROW( - spiller_->setPartitionsSpilled(spillPartitionNumSet), ""); - spiller_->spill(); + spiller_->spill(nullptr); rowContainer_->clear(); - ASSERT_TRUE(spiller_->isAllSpilled()); + ASSERT_TRUE(spiller_->state().isAllPartitionSpilled()); } // Spill data. - for (int i = 0; i < numAppendBatches; ++i) { - RowVectorPtr batch = makeDataset(rowType_, numBatchRows, nullptr); - splitByPartition(batch, spillHashFunction, inputsByPartition); - for (const auto& partition : spillPartitionNumSet) { - spiller_->spill(partition, inputsByPartition[partition].back()); + if (type_ == SpillerType::NO_ROW_CONTAINER) { + auto* spiller = dynamic_cast(spiller_.get()); + ASSERT_NE(spiller, nullptr); + for (int i = 0; i < numAppendBatches; ++i) { + RowVectorPtr batch = makeDataset(rowType_, numBatchRows, nullptr); + splitByPartition(batch, spillHashFunction, inputsByPartition); + for (const auto& partition : spillPartitionNumSet) { + spiller->spill(partition, inputsByPartition[partition].back()); + } } } // Assert that hash probe type of spiller type doesn't support incremental // spilling. - if (type_ == Spiller::Type::kHashJoinProbe) { - VELOX_ASSERT_THROW(spiller_->spill(), ""); - } else { - spiller_->spill(); - ASSERT_TRUE(spiller_->isAllSpilled()); + if (type_ != SpillerType::NO_ROW_CONTAINER) { + spiller_->spill(nullptr); + ASSERT_TRUE(spiller_->state().isAllPartitionSpilled()); } const auto stats = spiller_->stats(); ASSERT_GE(stats.spilledFiles, 0); - if (type_ == Spiller::Type::kHashJoinProbe) { + if (type_ == SpillerType::NO_ROW_CONTAINER) { if (numAppendBatches == 0) { ASSERT_EQ(stats.spilledRows, 0); ASSERT_EQ(stats.spilledBytes, 0); @@ -894,8 +947,8 @@ class SpillerTest : public exec::test::RowContainerTestBase { } ASSERT_GT(stats.spilledPartitions, 0); ASSERT_EQ(stats.spillSortTimeNanos, 0); - if (type_ == Spiller::Type::kHashJoinBuild || - type_ == Spiller::Type::kRowNumber) { + if (type_ == SpillerType::HASH_BUILD || + type_ == SpillerType::ROW_NUMBER_HASH_TABLE) { ASSERT_GT(stats.spillFillTimeNanos, 0); } else { ASSERT_EQ(stats.spillFillTimeNanos, 0); @@ -950,20 +1003,27 @@ class SpillerTest : public exec::test::RowContainerTestBase { } void verifyNonSortedSpillData( - std::vector> spillers, + std::vector> spillers, const SpillPartitionNumSet& spillPartitionNumSet, const std::vector>& inputsByPartition) { - ASSERT_TRUE( - type_ == Spiller::Type::kHashJoinBuild || - type_ == Spiller::Type::kRowNumber || - type_ == Spiller::Type::kHashJoinProbe); + needSort(type_); SpillPartitionSet spillPartitionSet; for (auto& spiller : spillers) { spiller->finishSpill(spillPartitionSet); - VELOX_ASSERT_THROW( - spiller->spill(0, nullptr), "Spiller has been finalized"); - VELOX_ASSERT_THROW(spiller->spill(), "Spiller has been finalized"); + if (type_ == SpillerType::NO_ROW_CONTAINER) { + auto* spillerImpl = dynamic_cast(spiller.get()); + ASSERT_NE(spillerImpl, nullptr); + // Check finalized throw + VELOX_ASSERT_THROW(spillerImpl->spill(0, nullptr), ""); + } else if (type_ == SpillerType::HASH_BUILD) { + auto* spillerImpl = dynamic_cast(spiller.get()); + ASSERT_NE(spillerImpl, nullptr); + // Check finalized throw + VELOX_ASSERT_THROW(spillerImpl->spill(0, nullptr), ""); + } + // Check finalized throw + VELOX_ASSERT_THROW(spiller->spill(nullptr), ""); } ASSERT_EQ(spillPartitionSet.size(), spillPartitionNumSet.size()); @@ -973,7 +1033,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { hashBits_.begin(), spillPartitionEntry.first.partitionBitOffset()); auto reader = spillPartitionEntry.second->createUnorderedReader( spillConfig_.readBufferSize, pool(), &spillStats_); - if (type_ == Spiller::Type::kHashJoinProbe) { + if (type_ == SpillerType::NO_ROW_CONTAINER) { // For hash probe type, we append each input vector as one batch in // spill file so that we can do one-to-one comparison. for (int i = 0; i < inputsByPartition[partition].size(); ++i) { @@ -1026,16 +1086,23 @@ class SpillerTest : public exec::test::RowContainerTestBase { const SpillPartitionNumSet& spillPartitionNumSet, const std::vector>& inputsByPartition) { ASSERT_TRUE( - type_ == Spiller::Type::kHashJoinBuild || - type_ == Spiller::Type::kRowNumber || - type_ == Spiller::Type::kHashJoinProbe); + type_ == SpillerType::HASH_BUILD || + type_ == SpillerType::ROW_NUMBER_HASH_TABLE || + type_ == SpillerType::NO_ROW_CONTAINER); SpillPartitionSet spillPartitionSet; spiller_->finishSpill(spillPartitionSet); - VELOX_ASSERT_THROW( - spiller_->spill(0, nullptr), "Spiller has been finalized"); - VELOX_ASSERT_THROW(spiller_->spill(), "Spiller has been finalized"); - VELOX_ASSERT_THROW(spiller_->spill(RowContainerIterator{}), ""); + + if (type_ == SpillerType::NO_ROW_CONTAINER) { + auto* spillerImpl = dynamic_cast(spiller_.get()); + ASSERT_NE(spillerImpl, nullptr); + VELOX_ASSERT_THROW(spillerImpl->spill(0, nullptr), ""); + } else if (type_ == SpillerType::HASH_BUILD) { + auto* spillerImpl = dynamic_cast(spiller_.get()); + ASSERT_NE(spillerImpl, nullptr); + VELOX_ASSERT_THROW(spillerImpl->spill(0, nullptr), ""); + } + VELOX_ASSERT_THROW(spiller_->spill(nullptr), ""); ASSERT_EQ(spillPartitionSet.size(), spillPartitionNumSet.size()); for (auto& spillPartitionEntry : spillPartitionSet) { @@ -1044,7 +1111,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { hashBits_.begin(), spillPartitionEntry.first.partitionBitOffset()); auto reader = spillPartitionEntry.second->createUnorderedReader( spillConfig_.readBufferSize, pool(), &spillStats_); - if (type_ == Spiller::Type::kHashJoinProbe) { + if (type_ == SpillerType::NO_ROW_CONTAINER) { // For hash probe type, we append each input vector as one batch in // spill file so that we can do one-to-one comparison. for (int i = 0; i < inputsByPartition[partition].size(); ++i) { @@ -1115,7 +1182,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { } const TestParam param_; - const Spiller::Type type_; + const SpillerType type_; const int32_t executorPoolSize_; const common::CompressionKind compressionKind_; const bool enablePrefixSort_; @@ -1143,7 +1210,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { std::vector rows_; std::vector> partitions_; std::vector compareFlags_; - std::unique_ptr spiller_; + std::unique_ptr spiller_; common::SpillConfig spillConfig_; folly::Synchronized spillStats_; }; @@ -1153,10 +1220,11 @@ struct AllTypesTestParam { uint64_t maxSpillRunRows; }; -class AllTypes : public SpillerTest, - public testing::WithParamInterface { +class AllTypesSpillerTest + : public SpillerTest, + public testing::WithParamInterface { public: - AllTypes() + AllTypesSpillerTest() : SpillerTest(GetParam().param), maxSpillRunRows_(GetParam().maxSpillRunRows) {} @@ -1177,30 +1245,27 @@ class AllTypes : public SpillerTest, protected: uint64_t maxSpillRunRows_; }; +} // namespace facebook::velox::exec::test -TEST_P(AllTypes, nonSortedSpillFunctions) { - if (type_ == Spiller::Type::kOrderByInput || - type_ == Spiller::Type::kOrderByOutput || - type_ == Spiller::Type::kAggregateInput || - type_ == Spiller::Type::kAggregateOutput) { +TEST_P(AllTypesSpillerTest, nonSortedSpillFunctions) { + if (type_ == SpillerType::SORT_INPUT || type_ == SpillerType::SORT_OUTPUT || + type_ == SpillerType::AGGREGATION_INPUT || + type_ == SpillerType::AGGREGATION_OUTPUT) { setupSpillData(numKeys_, 5'000, 1, nullptr, {}); sortSpillData(); setupSpiller(100'000, 0, false, maxSpillRunRows_); - { - RowVectorPtr dummyVector; - VELOX_ASSERT_THROW( - spiller_->spill(0, dummyVector), "Unexpected spiller type"); - } - if (type_ == Spiller::Type::kOrderByOutput) { + if (type_ == SpillerType::SORT_OUTPUT) { RowContainerIterator rowIter; std::vector> rows(5'000, *pool_); int numListedRows{0}; numListedRows = rowContainer_->listRows(&rowIter, 5000, rows.data()); ASSERT_EQ(numListedRows, 5000); - spiller_->spill(rows); + spill( + spiller_.get(), + {std::nullopt, std::nullopt, std::optional(&rows), std::nullopt}); } else { - spiller_->spill(); + spill(spiller_.get()); } ASSERT_FALSE(spiller_->finalized()); @@ -1217,36 +1282,37 @@ TEST_P(AllTypes, nonSortedSpillFunctions) { testNonSortedSpill(1, 5'000, 0, 1, maxSpillRunRows_); } -TEST_P(AllTypes, readaheadTest) { - if (type_ == Spiller::Type::kOrderByOutput || - type_ == Spiller::Type::kAggregateOutput) { +TEST_P(AllTypesSpillerTest, readaheadTest) { + if (type_ == SpillerType::SORT_OUTPUT || + type_ == SpillerType::AGGREGATION_OUTPUT) { return; } - if (type_ == Spiller::Type::kOrderByInput || - type_ == Spiller::Type::kAggregateInput) { + if (type_ == SpillerType::SORT_INPUT || + type_ == SpillerType::AGGREGATION_INPUT) { testSortedSpill(10, 10, false, false, 512); return; } testNonSortedSpill(1, 5'000, 0, 1'000'000'000, maxSpillRunRows_, 512); } -class NoHashJoin : public SpillerTest, - public testing::WithParamInterface { +class SortedSpillerTest : public SpillerTest, + public testing::WithParamInterface { public: - NoHashJoin() : SpillerTest(GetParam()) {} + SortedSpillerTest() : SpillerTest(GetParam()) {} static std::vector getTestParams() { return TestParamsBuilder{ .typesToExclude = - {Spiller::Type::kHashJoinProbe, - Spiller::Type::kHashJoinBuild, - Spiller::Type::kRowNumber, - Spiller::Type::kOrderByOutput}} + {SpillerType::NO_ROW_CONTAINER, + SpillerType::HASH_BUILD, + SpillerType::ROW_NUMBER_HASH_TABLE, + SpillerType::SORT_OUTPUT, + SpillerType::AGGREGATION_OUTPUT}} .getTestParams(); } }; -TEST_P(NoHashJoin, spillFew) { +TEST_P(SortedSpillerTest, spillFew) { // Test with distinct sort keys. testSortedSpill(10, 1); testSortedSpill(10, 1, false, false); @@ -1259,7 +1325,7 @@ TEST_P(NoHashJoin, spillFew) { testSortedSpill(10, 10, true, false); } -TEST_P(NoHashJoin, spillMost) { +TEST_P(SortedSpillerTest, spillMost) { // Test with distinct sort keys. testSortedSpill(60, 1); testSortedSpill(60, 1, false, false); @@ -1272,7 +1338,7 @@ TEST_P(NoHashJoin, spillMost) { testSortedSpill(60, 10, true, false); } -TEST_P(NoHashJoin, spillAll) { +TEST_P(SortedSpillerTest, spillAll) { // Test with distinct sort keys. testSortedSpill(100, 1); testSortedSpill(100, 1, false, false); @@ -1285,7 +1351,7 @@ TEST_P(NoHashJoin, spillAll) { testSortedSpill(100, 10, true, false); } -TEST_P(NoHashJoin, error) { +TEST_P(SortedSpillerTest, error) { testSortedSpill(100, 1, false, true); } @@ -1297,11 +1363,12 @@ class HashJoinBuildOnly : public SpillerTest, static std::vector getTestParams() { return TestParamsBuilder{ .typesToExclude = - {Spiller::Type::kAggregateInput, - Spiller::Type::kAggregateOutput, - Spiller::Type::kHashJoinProbe, - Spiller::Type::kOrderByInput, - Spiller::Type::kOrderByOutput}} + {SpillerType::AGGREGATION_INPUT, + SpillerType::AGGREGATION_OUTPUT, + SpillerType::NO_ROW_CONTAINER, + SpillerType::SORT_INPUT, + SpillerType::SORT_OUTPUT, + SpillerType::ROW_NUMBER_HASH_TABLE}} .getTestParams(); } }; @@ -1312,23 +1379,32 @@ TEST_P(HashJoinBuildOnly, spillPartition) { HashPartitionFunction spillHashFunction(hashBits_, rowType_, keyChannels_); splitByPartition(rowVector_, spillHashFunction, vectorsByPartition); setupSpiller(100'000, 0, false); - spiller_->spill(); + spill(spiller_.get()); rowContainer_->clear(); - spiller_->spill(); + spill(spiller_.get()); verifyNonSortedSpillData(allPartitionNumSet(), vectorsByPartition); - VELOX_ASSERT_THROW(spiller_->spill(), "Spiller has been finalized"); - VELOX_ASSERT_THROW(spiller_->spill(RowContainerIterator{}), ""); + VELOX_ASSERT_THROW(spill(spiller_.get()), ""); + RowContainerIterator rowIter; + // TODO: Separating different types to different assert calls with different + // API. + VELOX_ASSERT_THROW( + spill( + spiller_.get(), + {std::nullopt, std::nullopt, std::nullopt, std::optional(&rowIter)}), + ""); } TEST_P(HashJoinBuildOnly, writeBufferSize) { - std::vector writeBufferSizes = {0, 4'000'000'000}; + std::vector writeBufferSizes = {0 /*, 4'000'000'000*/}; for (const auto writeBufferSize : writeBufferSizes) { SCOPED_TRACE( fmt::format("writeBufferSize {}", succinctBytes(writeBufferSize))); setupSpillData(numKeys_, 1'000, 1, nullptr, {}); setupSpiller(4'000'000'000, writeBufferSize, false); - spiller_->spill(); - ASSERT_TRUE(spiller_->isAllSpilled()); + spill( + spiller_.get(), + {std::nullopt, std::nullopt, std::nullopt, std::nullopt}); + ASSERT_TRUE(spiller_->state().isAllPartitionSpilled()); const int numDiskWrites = spiller_->stats().spillWrites; if (writeBufferSize != 0) { ASSERT_EQ(numDiskWrites, 0); @@ -1350,7 +1426,13 @@ TEST_P(HashJoinBuildOnly, writeBufferSize) { for (int partition = 0; partition < numPartitions_; ++partition) { const auto& splitVector = splitVectors[partition]; if (!splitVector.empty()) { - spiller_->spill(partition, splitVector.back()); + // TODO: Check if this is for all types? + spill( + spiller_.get(), + {std::optional(partition), + std::optional(splitVector.back()), + std::nullopt, + std::nullopt}); ++spillInputVectorCount; } } @@ -1389,12 +1471,12 @@ class AggregationOutputOnly : public SpillerTest, static std::vector getTestParams() { return TestParamsBuilder{ .typesToExclude = - {Spiller::Type::kAggregateInput, - Spiller::Type::kHashJoinBuild, - Spiller::Type::kRowNumber, - Spiller::Type::kHashJoinProbe, - Spiller::Type::kOrderByInput, - Spiller::Type::kOrderByOutput}} + {SpillerType::AGGREGATION_INPUT, + SpillerType::HASH_BUILD, + SpillerType::ROW_NUMBER_HASH_TABLE, + SpillerType::NO_ROW_CONTAINER, + SpillerType::SORT_INPUT, + SpillerType::SORT_OUTPUT}} .getTestParams(); } }; @@ -1455,13 +1537,9 @@ TEST_P(AggregationOutputOnly, basic) { &rowIter, testData.spillRowOffset, rows.data()); } ASSERT_EQ(numListedRows, std::min(numRows, testData.spillRowOffset)); - { - RowVectorPtr dummy; - VELOX_ASSERT_THROW( - spiller_->spill(0, dummy), - "Unexpected spiller type: AGGREGATE_OUTPUT"); - } - spiller_->spill(rowIter); + spill( + spiller_.get(), + {std::nullopt, std::nullopt, std::nullopt, std::optional(&rowIter)}); ASSERT_EQ(rowContainer_->numRows(), numRows); rowContainer_->clear(); @@ -1502,25 +1580,27 @@ TEST_P(AggregationOutputOnly, basic) { } } -class OrderByOutputOnly : public SpillerTest, - public testing::WithParamInterface { +class SortOutputOnly : public SpillerTest, + public testing::WithParamInterface { public: - OrderByOutputOnly() : SpillerTest(GetParam()) {} + SortOutputOnly() : SpillerTest(GetParam()) {} static std::vector getTestParams() { return TestParamsBuilder{ .typesToExclude = - {Spiller::Type::kAggregateInput, - Spiller::Type::kAggregateOutput, - Spiller::Type::kHashJoinBuild, - Spiller::Type::kHashJoinProbe, - Spiller::Type::kRowNumber, - Spiller::Type::kOrderByInput}} + { + SpillerType::AGGREGATION_INPUT, + SpillerType::AGGREGATION_OUTPUT, + SpillerType::HASH_BUILD, + SpillerType::NO_ROW_CONTAINER, + SpillerType::ROW_NUMBER_HASH_TABLE, + SpillerType::SORT_INPUT, + }} .getTestParams(); } }; -TEST_P(OrderByOutputOnly, basic) { +TEST_P(SortOutputOnly, basic) { const int numRows = 5'000; struct { int numSpillRows; @@ -1560,18 +1640,21 @@ TEST_P(OrderByOutputOnly, basic) { rowContainer_->listRows(&rowIter, testData.numSpillRows, rows.data()); ASSERT_LE(numListedRows, numRows); { - RowVectorPtr dummy; + SpillerBase::SpillRows emptyRows(*pool_); VELOX_ASSERT_THROW( - spiller_->spill(0, dummy), - "Unexpected spiller type: ORDER_BY_OUTPUT"); - } - { - Spiller::SpillRows emptyRows(*pool_); - VELOX_ASSERT_THROW(spiller_->spill(emptyRows), ""); + spill( + spiller_.get(), + {std::nullopt, + std::nullopt, + std::optional(&emptyRows), + std::nullopt}), + ""); } - auto spillRows = - Spiller::SpillRows(rows.begin(), rows.begin() + numListedRows, *pool_); - spiller_->spill(spillRows); + auto spillRows = SpillerBase::SpillRows( + rows.begin(), rows.begin() + numListedRows, *pool_); + spill( + spiller_.get(), + {std::nullopt, std::nullopt, std::optional(&spillRows), std::nullopt}); ASSERT_EQ(rowContainer_->numRows(), numRows); rowContainer_->clear(); @@ -1619,7 +1702,7 @@ class MaxSpillRunTest : public SpillerTest, static std::vector getTestParams() { return TestParamsBuilder{ .typesToExclude = - {Spiller::Type::kHashJoinProbe, Spiller::Type::kOrderByOutput}} + {SpillerType::NO_ROW_CONTAINER, SpillerType::SORT_OUTPUT}} .getTestParams(); } }; @@ -1646,16 +1729,19 @@ TEST_P(MaxSpillRunTest, basic) { 0, false, testData.maxSpillRunRows); - if (type_ == Spiller::Type::kOrderByOutput) { - RowContainerIterator rowIter; - Spiller::SpillRows rows(numRows, *pool_); - int numListedRows{0}; - numListedRows = rowContainer_->listRows(&rowIter, numRows, rows.data()); - ASSERT_EQ(numListedRows, numRows); - spiller_->spill(rows); - } else { - spiller_->spill(); - } + RowContainerIterator rowIter; + SpillerBase::SpillRows rows(numRows, *pool_); + int numListedRows{0}; + numListedRows = rowContainer_->listRows(&rowIter, numRows, rows.data()); + ASSERT_EQ(numListedRows, numRows); + // Let helper decide which public API to call based on impl type. + spill( + spiller_.get(), + {std::nullopt, + std::nullopt, + std::optional(&rows), + std::optional(nullptr)}); + ASSERT_FALSE(spiller_->finalized()); SpillPartitionSet spillPartitionSet; spiller_->finishSpill(spillPartitionSet); @@ -1671,15 +1757,15 @@ TEST_P(MaxSpillRunTest, basic) { const auto& stats = spiller_->stats(); ASSERT_EQ(totalSize, stats.spilledBytes); - if (type_ == Spiller::Type::kAggregateOutput || - type_ == Spiller::Type::kOrderByOutput) { + if (type_ == SpillerType::AGGREGATION_OUTPUT || + type_ == SpillerType::SORT_OUTPUT) { ASSERT_EQ(numFiles, numPartitions_); ASSERT_EQ(spillPartitionSet.size(), numPartitions_); - } else if (type_ == Spiller::Type::kOrderByInput) { + } else if (type_ == SpillerType::SORT_INPUT) { // Need sort. ASSERT_EQ(numFiles, testData.expectedNumFiles); ASSERT_EQ(spillPartitionSet.size(), numPartitions_); - } else if (type_ == Spiller::Type::kAggregateInput) { + } else if (type_ == SpillerType::AGGREGATION_INPUT) { ASSERT_GE(numFiles, testData.expectedNumFiles); ASSERT_EQ(spillPartitionSet.size(), numPartitions_); } else { @@ -1691,13 +1777,13 @@ TEST_P(MaxSpillRunTest, basic) { VELOX_INSTANTIATE_TEST_SUITE_P( SpillerTest, - AllTypes, - testing::ValuesIn(AllTypes::getTestParams())); + AllTypesSpillerTest, + testing::ValuesIn(AllTypesSpillerTest::getTestParams())); VELOX_INSTANTIATE_TEST_SUITE_P( SpillerTest, - NoHashJoin, - testing::ValuesIn(NoHashJoin::getTestParams())); + SortedSpillerTest, + testing::ValuesIn(SortedSpillerTest::getTestParams())); VELOX_INSTANTIATE_TEST_SUITE_P( SpillerTest, @@ -1711,8 +1797,8 @@ VELOX_INSTANTIATE_TEST_SUITE_P( VELOX_INSTANTIATE_TEST_SUITE_P( SpillerTest, - OrderByOutputOnly, - testing::ValuesIn(OrderByOutputOnly::getTestParams())); + SortOutputOnly, + testing::ValuesIn(SortOutputOnly::getTestParams())); VELOX_INSTANTIATE_TEST_SUITE_P( SpillerTest, diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index b527b7e6b6ce..1fd0384fce67 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -2299,9 +2299,9 @@ DEBUG_ONLY_TEST_F(TaskTest, taskReclaimFailure) { const std::string spillTableError{"spillTableError"}; SCOPED_TESTVALUE_SET( - "facebook::velox::exec::Spiller", - std::function( - [&](Spiller* /*unused*/) { VELOX_FAIL(spillTableError); })); + "facebook::velox::exec::SpillerBase", + std::function( + [&](SpillerBase* /*unused*/) { VELOX_FAIL(spillTableError); })); TestScopedSpillInjection injection(100); const auto spillDirectory = exec::test::TempDirectoryPath::create(); diff --git a/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp b/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp index d214af4aeb9d..9eedc54f1ae0 100644 --- a/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp +++ b/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp @@ -33,7 +33,6 @@ #include "velox/expression/Expr.h" #include "velox/expression/SignatureBinder.h" -using facebook::velox::exec::Spiller; using facebook::velox::exec::test::AssertQueryBuilder; using facebook::velox::exec::test::CursorParameters; using facebook::velox::exec::test::PlanBuilder;