Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Dec 17, 2024
1 parent f16364c commit 0b5a164
Showing 1 changed file with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ protected List<AssignedJob> insideMachineParallelization(
ConnectContext context = statementContext.getConnectContext();
boolean useLocalShuffleToAddParallel = useLocalShuffleToAddParallel();
List<AssignedJob> instances = Lists.newArrayList();
int localShuffleParallelExecNum = fragment.getParallelExecNum();
for (Entry<DistributedPlanWorker, UninstancedScanSource> entry : workerToScanRanges.entrySet()) {
DistributedPlanWorker worker = entry.getKey();

Expand All @@ -85,17 +86,16 @@ protected List<AssignedJob> insideMachineParallelization(
// scan tbl1: [tablet_10001, tablet_10002, tablet_10003, tablet_10004] // no instances
// }
ScanSource scanSource = entry.getValue().scanSource;

// usually, its tablets num, or buckets num
int scanSourceMaxParallel = scanSource.maxParallel(scanNodes);

// now we should compute how many instances to process the data,
// for example: two instances
int instanceNum = degreeOfParallelism(scanSourceMaxParallel);

if (useLocalShuffleToAddParallel) {
assignLocalShuffleJobs(scanSource, instanceNum, instances, context, worker);
assignLocalShuffleJobs(scanSource, localShuffleParallelExecNum, instances, context, worker);
} else {
// usually, its tablets num, or buckets num
int scanSourceMaxParallel = scanSource.maxParallel(scanNodes);

// now we should compute how many instances to process the data,
// for example: two instances
int instanceNum = degreeOfParallelism(scanSourceMaxParallel);

assignedDefaultJobs(scanSource, instanceNum, instances, context, worker);
}
}
Expand Down

0 comments on commit 0b5a164

Please sign in to comment.