From 9e418b172c7b316d351acd6c4b3428d5ca03eebd Mon Sep 17 00:00:00 2001 From: Orri Erling Date: Thu, 26 Dec 2024 13:16:52 -0800 Subject: [PATCH] feat: Add utilities for combining dictionary wrappers (#11944) Summary: - Adds functions to transpose dictionaries with and without nulls. - Adds projection that wraps children of a RowVector into a dictionary so that dictionaries are combined instead of being nested. If multiple columns have the same wrapping indices, they continue to share the wrapping also after new indices are combined with the previous wrapper. - This is preparation for limiting dictionary wrapping to one level. This will speed up access and simplify corner cases of expressions. Pull Request resolved: https://github.com/facebookincubator/velox/pull/11944 Reviewed By: xiaoxmeng Differential Revision: D67612857 Pulled By: oerling fbshipit-source-id: 612c4cf0d46d92de4d6107d2de6aac60110c282a --- velox/exec/OperatorUtils.cpp | 100 +++++++++++++++++- velox/exec/OperatorUtils.h | 54 ++++++++++ velox/exec/tests/OperatorUtilsTest.cpp | 128 +++++++++++++++++++++++ velox/vector/BaseVector.cpp | 138 ++++++++++++++++++++----- velox/vector/BaseVector.h | 49 +++++++-- 5 files changed, 436 insertions(+), 33 deletions(-) diff --git a/velox/exec/OperatorUtils.cpp b/velox/exec/OperatorUtils.cpp index 536706fb9fb0..ccc6e3ffb8c1 100644 --- a/velox/exec/OperatorUtils.cpp +++ b/velox/exec/OperatorUtils.cpp @@ -258,6 +258,78 @@ vector_size_t processFilterResults( } } +VectorPtr wrapOne( + vector_size_t wrapSize, + BufferPtr wrapIndices, + const VectorPtr& inputVector, + BufferPtr wrapNulls, + WrapState& wrapState) { + if (!wrapIndices) { + VELOX_CHECK_NULL(wrapNulls); + return inputVector; + } + + if (inputVector->encoding() != VectorEncoding::Simple::DICTIONARY) { + return BaseVector::wrapInDictionary( + wrapNulls, wrapIndices, wrapSize, inputVector); + } + + if (wrapState.transposeResults.empty()) { + wrapState.nulls = wrapNulls.get(); + } else { + VELOX_CHECK( + wrapState.nulls == wrapNulls.get(), + "Must have identical wrapNulls for all wrapped columns"); + } + auto baseIndices = inputVector->wrapInfo(); + auto baseValues = inputVector->valueVector(); + // The base will be wrapped again without loading any lazy. The + // rewrapping is permitted in this case. + baseValues->clearContainingLazyAndWrapped(); + auto* rawBaseNulls = inputVector->rawNulls(); + if (rawBaseNulls) { + // Dictionary adds nulls. + BufferPtr newIndices = + AlignedBuffer::allocate(wrapSize, inputVector->pool()); + BufferPtr newNulls = + AlignedBuffer::allocate(wrapSize, inputVector->pool()); + const uint64_t* rawWrapNulls = + wrapNulls ? wrapNulls->as() : nullptr; + BaseVector::transposeIndicesWithNulls( + baseIndices->as(), + rawBaseNulls, + wrapSize, + wrapIndices->as(), + rawWrapNulls, + newIndices->asMutable(), + newNulls->asMutable()); + + return BaseVector::wrapInDictionary( + newNulls, newIndices, wrapSize, baseValues); + } + + // if another column had the same indices as this one and this one does not + // add nulls, we use the same transposed wrapping. + auto it = wrapState.transposeResults.find(baseIndices.get()); + if (it != wrapState.transposeResults.end()) { + return BaseVector::wrapInDictionary( + wrapNulls, BufferPtr(it->second), wrapSize, baseValues); + } + + auto newIndices = + AlignedBuffer::allocate(wrapSize, inputVector->pool()); + BaseVector::transposeIndices( + baseIndices->as(), + wrapSize, + wrapIndices->as(), + newIndices->asMutable()); + // If another column has the same wrapping and does not add nulls, we can use + // the same transposed indices. + wrapState.transposeResults[baseIndices.get()] = newIndices.get(); + return BaseVector::wrapInDictionary( + wrapNulls, newIndices, wrapSize, baseValues); +} + VectorPtr wrapChild( vector_size_t size, BufferPtr mapping, @@ -295,8 +367,9 @@ RowVectorPtr wrap( } std::vector wrappedChildren; wrappedChildren.reserve(childVectors.size()); + WrapState state; for (auto& child : childVectors) { - wrappedChildren.emplace_back(wrapChild(size, mapping, child)); + wrappedChildren.emplace_back(wrapOne(size, mapping, child, nullptr, state)); } return std::make_shared( pool, rowType, nullptr, size, wrappedChildren); @@ -425,6 +498,31 @@ void projectChildren( } } +void projectChildren( + std::vector& projectedChildren, + const RowVectorPtr& src, + const std::vector& projections, + int32_t size, + const BufferPtr& mapping, + WrapState* state) { + projectChildren( + projectedChildren, src->children(), projections, size, mapping, state); +} + +void projectChildren( + std::vector& projectedChildren, + const std::vector& src, + const std::vector& projections, + int32_t size, + const BufferPtr& mapping, + WrapState* state) { + for (const auto& projection : projections) { + projectedChildren[projection.outputChannel] = state + ? wrapOne(size, mapping, src[projection.inputChannel], nullptr, *state) + : wrapChild(size, mapping, src[projection.inputChannel]); + } +} + std::unique_ptr BlockedOperatorFactory::toOperator( DriverCtx* ctx, int32_t id, diff --git a/velox/exec/OperatorUtils.h b/velox/exec/OperatorUtils.h index bea261f3d24a..c8438a67a781 100644 --- a/velox/exec/OperatorUtils.h +++ b/velox/exec/OperatorUtils.h @@ -86,6 +86,37 @@ RowVectorPtr wrap( const std::vector& childVectors, memory::MemoryPool* pool); +/// Represents unique dictionary wrappers over a set of vectors when +/// wrapping these inside another dictionary. When multiple wrapped +/// vectors with the same wrapping get re-wrapped, we replace the +/// wrapper with a composition of the two dictionaries. This needs to +/// be done once per distinct wrapper in the input. WrapState records +/// the compositions that are already made. +struct WrapState { + // Records wrap nulls added in wrapping. If wrap nulls are added, the same + // wrap nulls must be applied to all columns. + Buffer* nulls; + + // Set of distinct wrappers in input, each mapped to the wrap + // indices combining the former with the new wrap. + + folly::F14FastMap transposeResults; +}; + +/// Wraps 'inputVector' with 'wrapIndices' and +/// 'wrapNulls'. 'wrapSize' is the size of of 'wrapIndices' and of +/// the resulting vector. Dictionary combining is deduplicated using +/// 'wrapState'. If the same indices are added on top of dictionary +/// encoded vectors sharing the same wrapping, the resulting vectors +/// will share the same composition of the original wrap and +/// 'wrapIndices'. +VectorPtr wrapOne( + vector_size_t wrapSize, + BufferPtr wrapIndices, + const VectorPtr& inputVector, + BufferPtr wrapNulls, + WrapState& wrapState); + // Ensures that all LazyVectors reachable from 'input' are loaded for all rows. void loadColumns(const RowVectorPtr& input, core::ExecCtx& execCtx); @@ -156,6 +187,29 @@ void projectChildren( int32_t size, const BufferPtr& mapping); +/// Projects children of 'src' row vector to 'dest' row vector +/// according to 'projections' and 'mapping'. 'size' specifies number +/// of projected rows in 'dest'. 'state' is used to +/// deduplicate dictionary merging when applying the same dictionary +/// over more than one identical set of indices. +void projectChildren( + std::vector& projectedChildren, + const RowVectorPtr& src, + const std::vector& projections, + int32_t size, + const BufferPtr& mapping, + WrapState* state); + +/// Overload of the above function that takes reference to const vector of +/// VectorPtr as 'src' argument, instead of row vector. +void projectChildren( + std::vector& projectedChildren, + const std::vector& src, + const std::vector& projections, + int32_t size, + const BufferPtr& mapping, + WrapState* state); + using BlockedOperatorCb = std::function; /// An operator that blocks until the blockedCb tells it not. blockedCb is diff --git a/velox/exec/tests/OperatorUtilsTest.cpp b/velox/exec/tests/OperatorUtilsTest.cpp index 5d4bab633b4b..fe3177da0016 100644 --- a/velox/exec/tests/OperatorUtilsTest.cpp +++ b/velox/exec/tests/OperatorUtilsTest.cpp @@ -531,3 +531,131 @@ TEST_F(OperatorUtilsTest, outputBatchRows) { ASSERT_EQ(1000, mockOp.outputRows(3'000'000'000)); } } + +TEST_F(OperatorUtilsTest, wrapMany) { + // Creates a RowVector with nullable and non-null vectors sharing + // different dictionary wraps. Rewraps these with a new wrap with + // and without nulls. Checks that the outcome has a single level of + // wrapping that combines the dictionaries and nulls and keeps the + // new wraps deduplicated where possible. + constexpr int32_t kSize = 1001; + auto indices1 = makeIndices(kSize, [](vector_size_t i) { return i; }); + auto indices2 = makeIndicesInReverse(kSize); + auto indices3 = makeIndicesInReverse(kSize); + auto wrapNulls = AlignedBuffer::allocate( + bits::nwords(kSize), pool_.get(), bits::kNotNull64); + for (auto i = 0; i < kSize; i += 5) { + bits::setNull(wrapNulls->asMutable(), i); + } + // Test dataset: *_a has no nulls, *_b has nulls. plain* is not wrapped. + // wrapped1* is wrapped in one dict, wrapped2* is wrapped in another, + // wrapped3* is wrapped in a dictionary that adds nulls. + auto row = makeRowVector( + {"plain_a", + "plain_b", + "wrapped1_a", + "wrapped1_b", + "wrapped2_a", + "wrapped2_b", + "wrapped3_a", + "wrapped3_b"}, + + {// plain_a + makeFlatVector(kSize, [](auto i) { return i; }), + // plain_b + makeFlatVector( + kSize, [](auto i) { return i; }, [](auto i) { return i % 4 == 0; }), + + // wrapped1-a + BaseVector::wrapInDictionary( + nullptr, + indices1, + kSize, + makeFlatVector(kSize, [](auto i) { return i; })), + // wrapped1_b + BaseVector::wrapInDictionary( + nullptr, + indices1, + kSize, + makeFlatVector( + kSize, + [](auto i) { return i; }, + [](auto i) { return i % 4 == 0; })), + + // wrapped2-a + BaseVector::wrapInDictionary( + nullptr, + indices2, + kSize, + makeFlatVector(kSize, [](auto i) { return i; })), + // wrapped2_b + BaseVector::wrapInDictionary( + nullptr, + indices2, + kSize, + makeFlatVector( + kSize, + [](auto i) { return i; }, + [](auto i) { return i % 4 == 0; })), + // wrapped3-a + BaseVector::wrapInDictionary( + wrapNulls, + indices3, + kSize, + makeFlatVector(kSize, [](auto i) { return i; })), + // wrapped3_b + BaseVector::wrapInDictionary( + wrapNulls, + indices3, + kSize, + makeFlatVector( + kSize, + [](auto i) { return i; }, + [](auto i) { return i % 4 == 0; })) + + }); + auto rowType = row->type(); + std::vector identicalProjections{}; + for (auto i = 0; i < rowType->size(); ++i) { + identicalProjections.emplace_back(i, i); + } + + // Now wrap 'row' in 'newIndices' keeping wraps to one level and deduplicating + // dictionary transposes. + auto newIndices = makeIndicesInReverse(kSize); + WrapState state; + std::vector projected(rowType->size()); + projectChildren( + projected, row, identicalProjections, kSize, newIndices, &state); + auto result = makeRowVector(projected); + for (auto i = 0; i < kSize; ++i) { + EXPECT_TRUE( + row->equalValueAt(result.get(), i, newIndices->as()[i])); + } + + // The two unwrapped columns get 'newIndices' directly. + EXPECT_EQ(projected[0]->wrapInfo(), newIndices); + EXPECT_EQ(projected[1]->wrapInfo(), newIndices); + + // The next two have the same wrapper and this is now combined with newIndices + // and used twice. + EXPECT_NE(projected[2]->wrapInfo(), newIndices); + EXPECT_NE(projected[2]->wrapInfo(), indices2); + EXPECT_EQ(projected[2]->wrapInfo(), projected[3]->wrapInfo()); + + // The next two share a different wrapper. + EXPECT_NE(projected[3]->wrapInfo(), projected[4]->wrapInfo()); + EXPECT_EQ(projected[4]->wrapInfo(), projected[5]->wrapInfo()); + + // The next two columns have nulls from their wrapper and thus they each get + // their own wrappers. + EXPECT_NE(projected[6]->wrapInfo(), projected[7]->wrapInfo()); + + // All columns have one level of wrapping. + EXPECT_EQ( + projected[2]->valueVector()->encoding(), VectorEncoding::Simple::FLAT); + EXPECT_EQ( + projected[4]->valueVector()->encoding(), VectorEncoding::Simple::FLAT); + EXPECT_EQ( + projected[6]->valueVector()->encoding(), VectorEncoding::Simple::FLAT); +} diff --git a/velox/vector/BaseVector.cpp b/velox/vector/BaseVector.cpp index 180e4c33336f..d65db337726c 100644 --- a/velox/vector/BaseVector.cpp +++ b/velox/vector/BaseVector.cpp @@ -170,37 +170,27 @@ VectorPtr BaseVector::wrapInDictionary( return result; } -template -static VectorPtr -addSequence(BufferPtr lengths, vector_size_t size, VectorPtr vector) { - auto base = vector.get(); - auto pool = base->pool(); - auto lsize = lengths->size(); - return std::make_shared< - SequenceVector::WrapperType>>( - pool, - size, - std::move(vector), - std::move(lengths), - SimpleVectorStats::WrapperType>{}, - std::nullopt /*distinctCount*/, - std::nullopt, - false /*sorted*/, - base->representedBytes().has_value() - ? std::optional( - base->representedBytes().value() * size / - (1 + (lsize / sizeof(vector_size_t)))) - : std::nullopt); -} - // static VectorPtr BaseVector::wrapInSequence( BufferPtr lengths, vector_size_t size, VectorPtr vector) { - auto kind = vector->typeKind(); - return VELOX_DYNAMIC_TYPE_DISPATCH_ALL( - addSequence, kind, std::move(lengths), size, std::move(vector)); + const auto numLengths = lengths->size() / sizeof(vector_size_t); + int64_t numIndices = 0; + auto* rawLengths = lengths->as(); + for (auto i = 0; i < numLengths; ++i) { + numIndices += rawLengths[i]; + } + VELOX_CHECK_LT(numIndices, std::numeric_limits::max()); + BufferPtr indices = + AlignedBuffer::allocate(numIndices, vector->pool()); + auto* rawIndices = indices->asMutable(); + int32_t fill = 0; + for (auto i = 0; i < numLengths; ++i) { + std::fill(rawIndices + fill, rawIndices + fill + rawLengths[i], i); + fill += rawLengths[i]; + } + return wrapInDictionary(nullptr, indices, numIndices, vector); } template @@ -1007,6 +997,102 @@ std::string printIndices( return out.str(); } +// static +void BaseVector::transposeIndices( + const vector_size_t* baseIndices, + vector_size_t wrapSize, + const vector_size_t* wrapIndices, + vector_size_t* resultIndices) { + constexpr int32_t kBatch = xsimd::batch::size; + static_assert(kBatch == 8); + static_assert(sizeof(vector_size_t) == sizeof(int32_t)); + int32_t i = 0; + for (; i + kBatch <= wrapSize; i += kBatch) { + auto indexBatch = xsimd::load_unaligned(wrapIndices + i); + simd::gather(baseIndices, indexBatch).store_unaligned(resultIndices + i); + } + if (i < wrapSize) { + auto indexBatch = xsimd::load_unaligned(wrapIndices + i); + auto mask = simd::leadingMask(wrapSize - i); + simd::maskGather( + xsimd::batch::broadcast(0), mask, baseIndices, indexBatch) + .store_unaligned(resultIndices + i); + } +} + +// static +void BaseVector::transposeIndicesWithNulls( + const vector_size_t* baseIndices, + const uint64_t* baseNulls, + vector_size_t wrapSize, + const vector_size_t* wrapIndices, + const uint64_t* wrapNulls, + vector_size_t* resultIndices, + uint64_t* resultNulls) { + constexpr int32_t kBatch = xsimd::batch::size; + static_assert(kBatch == 8); + static_assert(sizeof(vector_size_t) == sizeof(int32_t)); + for (auto i = 0; i < wrapSize; i += kBatch) { + auto indexBatch = xsimd::load_unaligned(wrapIndices + i); + uint8_t wrapNullsByte = + i + kBatch > wrapSize ? bits::lowMask(wrapSize - i) : 0xff; + + if (wrapNulls) { + wrapNullsByte &= reinterpret_cast(wrapNulls)[i / 8]; + } + if (wrapNullsByte != 0xff) { + // Zero out indices at null positions. + auto mask = simd::fromBitMask(wrapNullsByte); + indexBatch = indexBatch & + xsimd::load_unaligned(reinterpret_cast(&mask)); + } + if (baseNulls) { + uint8_t baseNullBits = simd::gather8Bits(baseNulls, indexBatch, 8); + wrapNullsByte &= baseNullBits; + } + reinterpret_cast(resultNulls)[i / 8] = wrapNullsByte; + simd::gather(baseIndices, indexBatch) + .store_unaligned(resultIndices + i); + } +} + +// static +void BaseVector::transposeDictionaryValues( + vector_size_t wrapSize, + BufferPtr& wrapNulls, + BufferPtr& wrapIndices, + std::shared_ptr& dictionaryValues) { + if (!wrapIndices->unique()) { + wrapIndices = AlignedBuffer::copy(dictionaryValues->pool(), wrapIndices); + } + auto* rawBaseNulls = dictionaryValues->rawNulls(); + auto baseIndices = dictionaryValues->wrapInfo(); + if (!rawBaseNulls && !wrapNulls) { + transposeIndices( + baseIndices->as(), + wrapSize, + wrapIndices->as(), + wrapIndices->asMutable()); + } else { + BufferPtr newNulls; + if (!wrapNulls || !wrapNulls->unique()) { + newNulls = AlignedBuffer::allocate( + wrapSize, dictionaryValues->pool(), bits::kNull); + } else { + newNulls = wrapNulls; + } + transposeIndicesWithNulls( + baseIndices->as(), + rawBaseNulls, + wrapSize, + wrapIndices->as(), + wrapNulls ? wrapNulls->as() : nullptr, + wrapIndices->asMutable(), + newNulls->asMutable()); + } + dictionaryValues = dictionaryValues->valueVector(); +} + template bool isAllSameFlat(const BaseVector& vector, vector_size_t size) { using T = typename KindToFlatVector::WrapperType; diff --git a/velox/vector/BaseVector.h b/velox/vector/BaseVector.h index 883ee403cfb2..65e014a193c0 100644 --- a/velox/vector/BaseVector.h +++ b/velox/vector/BaseVector.h @@ -540,12 +540,49 @@ class BaseVector { /// length. virtual VectorPtr slice(vector_size_t offset, vector_size_t length) const = 0; - /// Returns a vector of the type of 'source' where 'indices' contains - /// an index into 'source' for each element of 'source'. The - /// resulting vector has position i set to source[i]. This is - /// equivalent to wrapping 'source' in a dictionary with 'indices' - /// but this may reuse structure if said structure is uniquely owned - /// or if a copy is more efficient than dictionary wrapping. + /// Transposes two sets of dictionary indices into one level of indirection. + /// Sets result[i] = base[indices[i]] for i = 0 ... i < size. + static void transposeIndices( + const vector_size_t* base, + vector_size_t size, + const vector_size_t* indices, + vector_size_t* result); + + /// Transposes two levels of indices into a single level with nulls. sets + /// result[i] = base[indices[i]] where i is not null in 'wrapNulls' and + /// indices[i] is not null in 'baseNulls'. If indices[i] is null in + /// 'baseNulls' or i is null in 'wrapNulls', then 'resultNulls' is null at i. + /// 'wrapNulls' may be nullptr, meaning that no new nulls are added. + static void transposeIndicesWithNulls( + const vector_size_t* baseIndices, + const uint64_t* baseNulls, + vector_size_t wrapSize, + const vector_size_t* wrapIndices, + const uint64_t* wrapNulls, + vector_size_t* resultIndices, + uint64_t* resultNulls); + + /// Flattens 'dictionaryValues', which is a dictionary and replaces + /// it with its base. 'size' is the number of valid elements in + /// 'indices' and 'nulls'. Null positions may have an invalid + /// index. Rewrites 'indices' from being indices into + /// 'dictionaryValues' to being indices into the latter's + /// base. Rewrites 'nulls' to be nulls from 'dictionaryValues' and + /// its base vector. This is used when a dictionary vector loads a + /// lazy values vector and finds out that the loaded is itself a + /// dictionary. + static void transposeDictionaryValues( + vector_size_t wrapSize, + BufferPtr& wrapNulls, + BufferPtr& wrapIndices, + std::shared_ptr& dictionaryValues); + + // Returns a vector of the type of 'source' where 'indices' contains + // an index into 'source' for each element of 'source'. The + // resulting vector has position i set to source[i]. This is + // equivalent to wrapping 'source' in a dictionary with 'indices' + // but this may reuse structure if said structure is uniquely owned + // or if a copy is more efficient than dictionary wrapping. static VectorPtr transpose(BufferPtr indices, VectorPtr&& source); static VectorPtr createConstant(