diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java index 7459351c5e2a..8de9f1ab589a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java @@ -149,9 +149,18 @@ protected Integer inferSourceParallelism(StreamExecutionEnvironment env) { Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM))); } Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM); - if (parallelism == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) { + if (parallelism == null + // Infer parallelism when parallelism is not set and infer scan parallelism is + // enabled. + && env.getParallelism() == -1 + && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) { if (isUnbounded()) { - parallelism = Math.max(1, options.get(CoreOptions.BUCKET)); + // In unaware bucket or dynamic bucket mode, we can't infer parallelism. + if (options.get(CoreOptions.BUCKET) == -1) { + return null; + } else { + parallelism = Math.max(1, options.get(CoreOptions.BUCKET)); + } } else { scanSplitsForInference(); parallelism = splitStatistics.splitNumber();