Skip to content

Commit 5b8b4bd

Browse files
mbasmanovafacebook-github-bot
authored andcommitted
Optimize count(distinct <complex type>) (facebookincubator#8560)
Summary: Aggregations over distinct inputs use `SetAccumulator<ComplexType>`, which in turn uses AddressableNonNullValueList to store unique complex type values in a single non-contiguous allocation within HashStringAllocator. Storing thousands or millions of values requires allocation with hundrends or thousands of contiguous pieces. Calling HashStringAllocator::prepareRead on such allocation ends up populating an `std::vector<ByteRange>` with thousands of entries. Calling this repeatedly for each value as part of SetAccumulator::extractValues() becomes very slow. SetAccumulator::extractValues calls AddressableNonNullValueList::read for each unique value (there can be millions of these): for (const auto& position : base.uniqueValues) { AddressableNonNullValueList::read( position.first.position, values, offset + position.second); } AddressableNonNullValueList::read calls HashStringAllocator::prepareRead, which collects thousands of byte ranges into a vector passed to ByteInputStream constructor: auto stream = HashStringAllocator::prepareRead(header); As a result, queries like count(distrinct <complex type>) are very slow. A fix is to modify HashStringAllocator::prepareRead to accept an optional limit on how many bytes to prepare for read, then use this in AddressableNonNullValueList::read to prepare only as many bytes as needed to extract a single value. In addition, do not store hashes of the values in HSA to avoid calling HashStringAllocator::prepareRead altogether just to fetch the hash. Added benchmark for SetAccumulator to add 10M mostly unique values and read them back. Without the optimizations, the benchmark couldn't finish within a reasonable time (a few mininutes). Changing benchmark to process 100K values allowed it to compelete. Before (100K values): ``` ============================================================================ [...]enchmarks/SetAccumulatorBenchmark.cpp relative time/iter iters/s ============================================================================ bigint 2.87ms 348.07 varchar 22.22ms 45.01 twoBigints 988.08ms 1.01 ``` After (100K values): ``` ============================================================================ [...]enchmarks/SetAccumulatorBenchmark.cpp relative time/iter iters/s ============================================================================ bigint 2.80ms 356.87 varchar 21.19ms 47.19 twoBigints 38.83ms 25.76 ``` After the optimizations, the original benchmark processing 10M values is finishing within a few seconds. After (10M values): ``` ============================================================================ [...]enchmarks/SetAccumulatorBenchmark.cpp relative time/iter iters/s ============================================================================ bigint 1.23s 814.20m varchar 2.96s 338.39m twoBigints 6.30s 158.70m ``` Pull Request resolved: facebookincubator#8560 Reviewed By: Yuhta Differential Revision: D53130262 Pulled By: mbasmanova fbshipit-source-id: 9401d56fc8f9d4eecdaa4bd2ef53ae6e5f6f4f07
1 parent a113acf commit 5b8b4bd

9 files changed

+209
-77
lines changed

velox/common/memory/HashStringAllocator.cpp

+12-1
Original file line numberDiff line numberDiff line change
@@ -123,15 +123,26 @@ void HashStringAllocator::freeToPool(void* ptr, size_t size) {
123123
}
124124

125125
// static
126-
ByteInputStream HashStringAllocator::prepareRead(const Header* begin) {
126+
ByteInputStream HashStringAllocator::prepareRead(
127+
const Header* begin,
128+
size_t maxBytes) {
127129
std::vector<ByteRange> ranges;
128130
auto header = const_cast<Header*>(begin);
131+
132+
size_t totalBytes = 0;
133+
129134
for (;;) {
130135
ranges.push_back(ByteRange{
131136
reinterpret_cast<uint8_t*>(header->begin()), header->usableSize(), 0});
137+
totalBytes += ranges.back().size;
132138
if (!header->isContinued()) {
133139
break;
134140
}
141+
142+
if (totalBytes >= maxBytes) {
143+
break;
144+
}
145+
135146
header = header->nextContinued();
136147
}
137148
return ByteInputStream(std::move(ranges));

velox/common/memory/HashStringAllocator.h

+5-1
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,11 @@ class HashStringAllocator : public StreamArena {
239239

240240
// Returns ByteInputStream over the data in the range of 'header' and
241241
// possible continuation ranges.
242-
static ByteInputStream prepareRead(const Header* header);
242+
// @param maxBytes If provided, the returned stream will cover at most that
243+
// many bytes.
244+
static ByteInputStream prepareRead(
245+
const Header* header,
246+
size_t maxBytes = std::numeric_limits<size_t>::max());
243247

244248
// Returns the number of payload bytes between 'header->begin()' and
245249
// 'position'.

velox/exec/AddressableNonNullValueList.cpp

+23-26
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
namespace facebook::velox::aggregate::prestosql {
2020

21-
HashStringAllocator::Position AddressableNonNullValueList::append(
21+
AddressableNonNullValueList::Entry AddressableNonNullValueList::append(
2222
const DecodedVector& decoded,
2323
vector_size_t index,
2424
HashStringAllocator* allocator) {
@@ -37,8 +37,10 @@ HashStringAllocator::Position AddressableNonNullValueList::append(
3737
allocator->extendWrite(currentPosition_, stream);
3838
}
3939

40-
// Write hash.
41-
stream.appendOne(decoded.base()->hashValueAt(decoded.index(index)));
40+
const auto hash = decoded.base()->hashValueAt(decoded.index(index));
41+
42+
const auto originalSize = stream.size();
43+
4244
// Write value.
4345
exec::ContainerRowSerde::serialize(
4446
*decoded.base(), decoded.index(index), stream);
@@ -47,53 +49,48 @@ HashStringAllocator::Position AddressableNonNullValueList::append(
4749

4850
auto startAndFinish = allocator->finishWrite(stream, 1024);
4951
currentPosition_ = startAndFinish.second;
50-
return startAndFinish.first;
52+
53+
const auto writtenSize = stream.size() - originalSize;
54+
55+
return {startAndFinish.first, writtenSize, hash};
5156
}
5257

5358
namespace {
5459

55-
ByteInputStream prepareRead(
56-
HashStringAllocator::Position position,
57-
bool skipHash) {
58-
auto header = position.header;
59-
auto seek = static_cast<int32_t>(position.position - header->begin());
60+
ByteInputStream prepareRead(const AddressableNonNullValueList::Entry& entry) {
61+
auto header = entry.offset.header;
62+
auto seek = entry.offset.position - header->begin();
6063

61-
auto stream = HashStringAllocator::prepareRead(header);
64+
auto stream = HashStringAllocator::prepareRead(header, entry.size + seek);
6265
stream.seekp(seek);
63-
if (skipHash) {
64-
stream.skip(sizeof(uint64_t));
65-
}
6666
return stream;
6767
}
6868
} // namespace
6969

7070
// static
7171
bool AddressableNonNullValueList::equalTo(
72-
HashStringAllocator::Position left,
73-
HashStringAllocator::Position right,
72+
const Entry& left,
73+
const Entry& right,
7474
const TypePtr& type) {
75-
auto leftStream = prepareRead(left, true /*skipHash*/);
76-
auto rightStream = prepareRead(right, true /*skipHash*/);
75+
if (left.hash != right.hash) {
76+
return false;
77+
}
78+
79+
auto leftStream = prepareRead(left);
80+
auto rightStream = prepareRead(right);
7781

7882
CompareFlags compareFlags =
7983
CompareFlags::equality(CompareFlags::NullHandlingMode::kNullAsValue);
8084
return exec::ContainerRowSerde::compare(
8185
leftStream, rightStream, type.get(), compareFlags) == 0;
8286
}
8387

84-
// static
85-
uint64_t AddressableNonNullValueList::readHash(
86-
HashStringAllocator::Position position) {
87-
auto stream = prepareRead(position, false /*skipHash*/);
88-
return stream.read<uint64_t>();
89-
}
90-
9188
// static
9289
void AddressableNonNullValueList::read(
93-
HashStringAllocator::Position position,
90+
const Entry& position,
9491
BaseVector& result,
9592
vector_size_t index) {
96-
auto stream = prepareRead(position, true /*skipHash*/);
93+
auto stream = prepareRead(position);
9794
exec::ContainerRowSerde::deserialize(stream, index, &result);
9895
}
9996

velox/exec/AddressableNonNullValueList.h

+16-19
Original file line numberDiff line numberDiff line change
@@ -30,33 +30,37 @@ namespace facebook::velox::aggregate::prestosql {
3030
/// set_union.
3131
class AddressableNonNullValueList {
3232
public:
33+
struct Entry {
34+
HashStringAllocator::Position offset;
35+
size_t size;
36+
uint64_t hash;
37+
};
38+
3339
struct Hash {
34-
size_t operator()(HashStringAllocator::Position position) const {
35-
return AddressableNonNullValueList::readHash(position);
40+
size_t operator()(const Entry& key) const {
41+
return key.hash;
3642
}
3743
};
3844

3945
struct EqualTo {
4046
const TypePtr& type;
4147

42-
bool operator()(
43-
HashStringAllocator::Position left,
44-
HashStringAllocator::Position right) const {
48+
bool operator()(const Entry& left, const Entry& right) const {
4549
return AddressableNonNullValueList::equalTo(left, right, type);
4650
}
4751
};
4852

4953
/// Append a non-null value to the end of the list. Returns 'index' that can
5054
/// be used to access the value later.
51-
HashStringAllocator::Position append(
55+
Entry append(
5256
const DecodedVector& decoded,
5357
vector_size_t index,
5458
HashStringAllocator* allocator);
5559

5660
/// Removes last element. 'position' must be a value returned from the latest
5761
/// call to 'append'.
58-
void removeLast(HashStringAllocator::Position position) {
59-
currentPosition_ = position;
62+
void removeLast(const Entry& entry) {
63+
currentPosition_ = entry.offset;
6064
--size_;
6165
}
6266

@@ -66,19 +70,12 @@ class AddressableNonNullValueList {
6670
}
6771

6872
/// Returns true if elements at 'left' and 'right' are equal.
69-
static bool equalTo(
70-
HashStringAllocator::Position left,
71-
HashStringAllocator::Position right,
72-
const TypePtr& type);
73-
74-
/// Returns the hash of the specified element.
75-
static uint64_t readHash(HashStringAllocator::Position position);
73+
static bool
74+
equalTo(const Entry& left, const Entry& right, const TypePtr& type);
7675

7776
/// Copies the specified element to 'result[index]'.
78-
static void read(
79-
HashStringAllocator::Position position,
80-
BaseVector& result,
81-
vector_size_t index);
77+
static void
78+
read(const Entry& position, BaseVector& result, vector_size_t index);
8279

8380
void free(HashStringAllocator& allocator) {
8481
if (size_ > 0) {

velox/exec/SetAccumulator.h

+4-4
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ struct StringViewSetAccumulator {
179179
struct ComplexTypeSetAccumulator {
180180
/// A set of pointers to values stored in AddressableNonNullValueList.
181181
SetAccumulator<
182-
HashStringAllocator::Position,
182+
AddressableNonNullValueList::Entry,
183183
AddressableNonNullValueList::Hash,
184184
AddressableNonNullValueList::EqualTo>
185185
base;
@@ -203,12 +203,12 @@ struct ComplexTypeSetAccumulator {
203203
base.nullIndex = cnt;
204204
}
205205
} else {
206-
auto position = values.append(decoded, index, allocator);
206+
auto entry = values.append(decoded, index, allocator);
207207

208208
if (!base.uniqueValues
209-
.insert({position, base.nullIndex.has_value() ? cnt + 1 : cnt})
209+
.insert({entry, base.nullIndex.has_value() ? cnt + 1 : cnt})
210210
.second) {
211-
values.removeLast(position);
211+
values.removeLast(entry);
212212
}
213213
}
214214
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#include <folly/Benchmark.h>
17+
#include <folly/init/Init.h>
18+
19+
#include "velox/common/memory/Memory.h"
20+
#include "velox/exec/SetAccumulator.h"
21+
#include "velox/vector/fuzzer/VectorFuzzer.h"
22+
#include "velox/vector/tests/utils/VectorTestBase.h"
23+
24+
using namespace facebook::velox;
25+
using namespace facebook::velox::exec;
26+
27+
namespace {
28+
29+
// Adds 10M mostly unique values to a single SetAccumulator, then extracts
30+
// unique values from it.
31+
class SetAccumulatorBenchmark : public facebook::velox::test::VectorTestBase {
32+
public:
33+
void setup() {
34+
VectorFuzzer::Options opts;
35+
opts.vectorSize = 1'000'000;
36+
VectorFuzzer fuzzer(opts, pool());
37+
38+
auto rowType = ROW({"a", "b", "c"}, {BIGINT(), BIGINT(), VARCHAR()});
39+
for (auto i = 0; i < 10; ++i) {
40+
rowVectors_.emplace_back(fuzzer.fuzzInputRow(rowType));
41+
}
42+
}
43+
44+
void runBigint() {
45+
runPrimitive<int64_t>("a");
46+
}
47+
48+
void runVarchar() {
49+
runPrimitive<StringView>("c");
50+
}
51+
52+
void runTwoBigints() {
53+
HashStringAllocator allocator(pool());
54+
const TypePtr type = ROW({BIGINT(), BIGINT()});
55+
aggregate::prestosql::SetAccumulator<ComplexType> accumulator(
56+
type, &allocator);
57+
58+
for (const auto& rowVector : rowVectors_) {
59+
auto vector =
60+
makeRowVector({rowVector->childAt("a"), rowVector->childAt("b")});
61+
DecodedVector decoded(*vector);
62+
for (auto i = 0; i < rowVector->size(); ++i) {
63+
accumulator.addValue(decoded, i, &allocator);
64+
}
65+
}
66+
67+
auto result = BaseVector::create(type, accumulator.size(), pool());
68+
accumulator.extractValues(*result, 0);
69+
folly::doNotOptimizeAway(result);
70+
}
71+
72+
private:
73+
template <typename T>
74+
void runPrimitive(const std::string& name) {
75+
const auto& type = rowVectors_[0]->childAt(name)->type();
76+
77+
HashStringAllocator allocator(pool());
78+
aggregate::prestosql::SetAccumulator<T> accumulator(type, &allocator);
79+
80+
for (const auto& rowVector : rowVectors_) {
81+
DecodedVector decoded(*rowVector->childAt(name));
82+
for (auto i = 0; i < rowVector->size(); ++i) {
83+
accumulator.addValue(decoded, i, &allocator);
84+
}
85+
}
86+
87+
auto result =
88+
BaseVector::create<FlatVector<T>>(type, accumulator.size(), pool());
89+
accumulator.extractValues(*result, 0);
90+
folly::doNotOptimizeAway(result);
91+
}
92+
93+
std::vector<RowVectorPtr> rowVectors_;
94+
};
95+
96+
std::unique_ptr<SetAccumulatorBenchmark> bm;
97+
98+
BENCHMARK(bigint) {
99+
bm->runBigint();
100+
}
101+
102+
BENCHMARK(varchar) {
103+
bm->runVarchar();
104+
}
105+
106+
BENCHMARK(twoBigints) {
107+
bm->runTwoBigints();
108+
}
109+
110+
} // namespace
111+
112+
int main(int argc, char** argv) {
113+
folly::init(&argc, &argv);
114+
memory::MemoryManager::initialize({});
115+
116+
bm = std::make_unique<SetAccumulatorBenchmark>();
117+
bm->setup();
118+
119+
folly::runBenchmarks();
120+
121+
bm.reset();
122+
123+
return 0;
124+
}

0 commit comments

Comments
 (0)