Skip to content

Commit bab8088

Browse files
committed
fix
1 parent e848a88 commit bab8088

File tree

3 files changed

+19
-16
lines changed

3 files changed

+19
-16
lines changed

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,12 @@ protected List<AssignedJob> insideMachineParallelization(
8787
ScanSource scanSource = entry.getValue().scanSource;
8888

8989
// usually, its tablets num, or buckets num
90-
int scanSourceMaxParallel = Math.max(scanSource.maxParallel(scanNodes), 1);
90+
int scanSourceMaxParallel = scanSource.maxParallel(scanNodes);
9191

9292
// now we should compute how many instances to process the data,
9393
// for example: two instances
9494
int instanceNum = degreeOfParallelism(scanSourceMaxParallel, useLocalShuffleToAddParallel);
95+
9596
if (useLocalShuffleToAddParallel) {
9697
assignLocalShuffleJobs(scanSource, instanceNum, instances, context, worker);
9798
} else {

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ protected Map<String, String> extraInfo() {
5050
@Override
5151
protected String formatScanSourceString() {
5252
if (receiveDataFromLocal) {
53-
return "read data from first instance of " + getAssignedWorker();
53+
return "read data from other instances";
5454
} else {
5555
return super.formatScanSourceString();
5656
}

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java

+16-14
Original file line numberDiff line numberDiff line change
@@ -170,26 +170,28 @@ protected List<AssignedJob> insideMachineParallelization(
170170
protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, List<AssignedJob> instances,
171171
ConnectContext context, DistributedPlanWorker worker) {
172172
// only generate one instance to scan all data, in this step
173-
List<ScanSource> assignJoinBuckets = scanSource.parallelize(
174-
scanNodes, 1
175-
);
173+
List<ScanSource> assignJoinBuckets = scanSource.parallelize(scanNodes, instanceNum);
176174

177-
// one scan range generate multiple instances,
178-
// different instances reference the same scan source
179175
int shareScanId = shareScanIdGenerator.getAndIncrement();
180-
181-
Set<Integer> firstInstanceAssignedJoinBuckets
182-
= ((BucketScanSource) assignJoinBuckets.get(0)).bucketIndexToScanNodeToTablets.keySet();
183-
184176
BucketScanSource shareScanSource = (BucketScanSource) scanSource;
185-
ScanSource emptyShareScanSource = shareScanSource.newEmpty();
177+
for (int i = 0; i < assignJoinBuckets.size(); i++) {
178+
BucketScanSource assignedJoinBucket = (BucketScanSource) assignJoinBuckets.get(i);
179+
LocalShuffleBucketJoinAssignedJob instance = new LocalShuffleBucketJoinAssignedJob(
180+
instances.size(), shareScanId, false,
181+
context.nextInstanceId(), this, worker,
182+
assignedJoinBucket,
183+
Utils.fastToImmutableSet(assignedJoinBucket.bucketIndexToScanNodeToTablets.keySet())
184+
);
185+
instances.add(instance);
186+
}
186187

187-
for (int i = 0; i < instanceNum; i++) {
188+
ScanSource emptyShareScanSource = shareScanSource.newEmpty();
189+
for (int i = assignJoinBuckets.size(); i < instanceNum; i++) {
188190
LocalShuffleBucketJoinAssignedJob instance = new LocalShuffleBucketJoinAssignedJob(
189-
instances.size(), shareScanId, i > 0,
191+
instances.size(), shareScanId, true,
190192
context.nextInstanceId(), this, worker,
191-
i == 0 ? shareScanSource : emptyShareScanSource,
192-
i == 0 ? Utils.fastToImmutableSet(firstInstanceAssignedJoinBuckets) : ImmutableSet.of()
193+
emptyShareScanSource,
194+
ImmutableSet.of()
193195
);
194196
instances.add(instance);
195197
}

0 commit comments

Comments
 (0)