From b5f0211f5e143f4361f3467679110190dc3c63db Mon Sep 17 00:00:00 2001 From: 924060929 Date: Tue, 17 Dec 2024 19:36:02 +0800 Subject: [PATCH] fix --- .../worker/job/AbstractUnassignedScanJob.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java index b14afab1acdc53f..521d4aaf2684019 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java @@ -76,7 +76,6 @@ protected List insideMachineParallelization( ConnectContext context = statementContext.getConnectContext(); boolean useLocalShuffleToAddParallel = useLocalShuffleToAddParallel(); List instances = Lists.newArrayList(); - int localShuffleParallelExecNum = fragment.getParallelExecNum(); for (Entry entry : workerToScanRanges.entrySet()) { DistributedPlanWorker worker = entry.getKey(); @@ -86,16 +85,19 @@ protected List insideMachineParallelization( // scan tbl1: [tablet_10001, tablet_10002, tablet_10003, tablet_10004] // no instances // } ScanSource scanSource = entry.getValue().scanSource; - if (useLocalShuffleToAddParallel) { - assignLocalShuffleJobs(scanSource, localShuffleParallelExecNum, instances, context, worker); - } else { - // usually, its tablets num, or buckets num - int scanSourceMaxParallel = scanSource.maxParallel(scanNodes); - // now we should compute how many instances to process the data, - // for example: two instances - int instanceNum = degreeOfParallelism(scanSourceMaxParallel); + // usually, its tablets num, or buckets num + int scanSourceMaxParallel = Math.max(scanSource.maxParallel(scanNodes), 1); + int maxParallel = useLocalShuffleToAddParallel + ? Math.max(fragment.getParallelExecNum(), scanSourceMaxParallel) + : scanSourceMaxParallel; + // now we should compute how many instances to process the data, + // for example: two instances + int instanceNum = degreeOfParallelism(maxParallel); + if (useLocalShuffleToAddParallel) { + assignLocalShuffleJobs(scanSource, instanceNum, instances, context, worker); + } else { assignedDefaultJobs(scanSource, instanceNum, instances, context, worker); } } @@ -180,7 +182,7 @@ protected int degreeOfParallelism(int maxParallel) { } // the scan instance num should not larger than the tablets num - return Math.min(maxParallel, Math.max(fragment.getParallelExecNum(), 1)); + return maxParallel; } protected List fillUpSingleEmptyInstance(DistributedPlanWorkerManager workerManager) {