Skip to content

Commit c3023b6

Browse files
Yuhtafacebook-github-bot
authored andcommitted
Merge nest loop join build vectors (facebookincubator#11428)
Summary: Pull Request resolved: facebookincubator#11428 When build vectors in nested loop join are small, we should merge them to get better performance. In some extreme case, the performance difference can be more than 900 times. Reviewed By: pedroerp Differential Revision: D65450017 fbshipit-source-id: 370655e418327c6bd33b191b3190d3322b9bd9af
1 parent 6b11a56 commit c3023b6

File tree

3 files changed

+88
-0
lines changed

3 files changed

+88
-0
lines changed

velox/exec/NestedLoopJoinBuild.cpp

+33
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,38 @@ BlockingReason NestedLoopJoinBuild::isBlocked(ContinueFuture* future) {
7070
return BlockingReason::kWaitForJoinBuild;
7171
}
7272

73+
// Merge adjacent vectors to larger vectors as long as the result do not exceed
74+
// the size limit. This is important for performance because each small vector
75+
// here would be duplicated by the number of rows on probe side, result in huge
76+
// number of small vectors in the output.
77+
std::vector<RowVectorPtr> NestedLoopJoinBuild::mergeDataVectors() const {
78+
const auto maxBatchRows =
79+
operatorCtx_->task()->queryCtx()->queryConfig().maxOutputBatchRows();
80+
std::vector<RowVectorPtr> merged;
81+
for (int i = 0; i < dataVectors_.size();) {
82+
auto batchSize = dataVectors_[i]->size();
83+
auto j = i + 1;
84+
while (j < dataVectors_.size() &&
85+
batchSize + dataVectors_[j]->size() <= maxBatchRows) {
86+
batchSize += dataVectors_[j++]->size();
87+
}
88+
if (j == i + 1) {
89+
merged.push_back(dataVectors_[i++]);
90+
} else {
91+
auto batch = BaseVector::create<RowVector>(
92+
dataVectors_[i]->type(), batchSize, pool());
93+
batchSize = 0;
94+
while (i < j) {
95+
auto* source = dataVectors_[i++].get();
96+
batch->copy(source, batchSize, 0, source->size());
97+
batchSize += source->size();
98+
}
99+
merged.push_back(std::move(batch));
100+
}
101+
}
102+
return merged;
103+
}
104+
73105
void NestedLoopJoinBuild::noMoreInput() {
74106
Operator::noMoreInput();
75107
std::vector<ContinuePromise> promises;
@@ -105,6 +137,7 @@ void NestedLoopJoinBuild::noMoreInput() {
105137
}
106138
}
107139

140+
dataVectors_ = mergeDataVectors();
108141
operatorCtx_->task()
109142
->getNestedLoopJoinBridge(
110143
operatorCtx_->driverCtx()->splitGroupId, planNodeId())

velox/exec/NestedLoopJoinBuild.h

+2
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ class NestedLoopJoinBuild : public Operator {
5959
}
6060

6161
private:
62+
std::vector<RowVectorPtr> mergeDataVectors() const;
63+
6264
std::vector<RowVectorPtr> dataVectors_;
6365

6466
// Future for synchronizing with other Drivers of the same pipeline. All build

velox/exec/tests/NestedLoopJoinTest.cpp

+53
Original file line numberDiff line numberDiff line change
@@ -569,5 +569,58 @@ TEST_F(NestedLoopJoinTest, outputOrder) {
569569
assertEqualVectors(expectedLeft, results);
570570
}
571571

572+
TEST_F(NestedLoopJoinTest, mergeBuildVectors) {
573+
const std::vector<RowVectorPtr> buildVectors = {
574+
makeRowVector({makeFlatVector<int64_t>({1, 2})}),
575+
makeRowVector({makeFlatVector<int64_t>({3, 4})}),
576+
makeRowVector(
577+
{makeFlatVector<int64_t>(20, [](auto i) { return 5 + i; })}),
578+
makeRowVector(
579+
{makeFlatVector<int64_t>(20, [](auto i) { return 25 + i; })}),
580+
makeRowVector({makeFlatVector<int64_t>({45, 46})}),
581+
};
582+
const std::vector<RowVectorPtr> probeVectors = {
583+
makeRowVector({makeFlatVector<int64_t>({1, 2})}),
584+
};
585+
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
586+
CursorParameters params;
587+
params.planNode = PlanBuilder(planNodeIdGenerator)
588+
.values(probeVectors)
589+
.nestedLoopJoin(
590+
PlanBuilder(planNodeIdGenerator)
591+
.values(buildVectors)
592+
.project({"c0 as r0"})
593+
.planNode(),
594+
{"c0", "r0"})
595+
.planNode();
596+
params.queryConfigs[core::QueryConfig::kMaxOutputBatchRows] = "10";
597+
auto cursor = TaskCursor::create(params);
598+
// Expect the first 2 build side vectors are merged together since they are
599+
// under the limit after merge. Others are left along.
600+
for (int i = 0; i < 2; ++i) {
601+
auto makeExpected = [&](vector_size_t size, vector_size_t buildOffset) {
602+
return makeRowVector({
603+
makeConstant<int64_t>(1 + i, size),
604+
makeFlatVector<int64_t>(
605+
size, [&](auto i) { return buildOffset + i; }),
606+
});
607+
};
608+
ASSERT_TRUE(cursor->moveNext());
609+
ASSERT_EQ(cursor->current()->size(), 4);
610+
assertEqualVectors(makeExpected(4, 1), cursor->current());
611+
ASSERT_TRUE(cursor->moveNext());
612+
ASSERT_EQ(cursor->current()->size(), 20);
613+
assertEqualVectors(makeExpected(20, 5), cursor->current());
614+
ASSERT_TRUE(cursor->moveNext());
615+
ASSERT_EQ(cursor->current()->size(), 20);
616+
assertEqualVectors(makeExpected(20, 25), cursor->current());
617+
ASSERT_TRUE(cursor->moveNext());
618+
ASSERT_EQ(cursor->current()->size(), 2);
619+
assertEqualVectors(makeExpected(2, 45), cursor->current());
620+
}
621+
ASSERT_FALSE(cursor->moveNext());
622+
ASSERT_TRUE(waitForTaskCompletion(cursor->task().get()));
623+
}
624+
572625
} // namespace
573626
} // namespace facebook::velox::exec::test

0 commit comments

Comments
 (0)