diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 2cc836f6cbd06..26553caf1529f 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -452,11 +452,11 @@ fn push_down_all_join( } } - // For infer predicates, if they can not push through join, just drop them + // Push predicates inferred from the join expression for predicate in inferred_join_predicates { - if left_preserved && checker.is_left_only(&predicate) { + if checker.is_left_only(&predicate) { left_push.push(predicate); - } else if right_preserved && checker.is_right_only(&predicate) { + } else if checker.is_right_only(&predicate) { right_push.push(predicate); } } @@ -2788,8 +2788,7 @@ mod tests { ) } - /// post-left-join predicate on a column common to both sides is only pushed to the left side - /// i.e. - not duplicated to the right side + /// post-left-join predicate on a column common to both sides is pushed to both sides #[test] fn filter_using_left_join_on_common() -> Result<()> { let table_scan = test_table_scan()?; @@ -2817,20 +2816,19 @@ mod tests { TableScan: test2 ", ); - // filter sent to left side of the join, not the right + // filter sent to left side of the join and to the right assert_optimized_plan_equal!( plan, @r" Left Join: Using test.a = test2.a TableScan: test, full_filters=[test.a <= Int64(1)] Projection: test2.a - TableScan: test2 + TableScan: test2, full_filters=[test2.a <= Int64(1)] " ) } - /// post-right-join predicate on a column common to both sides is only pushed to the right side - /// i.e. - not duplicated to the left side. + /// post-right-join predicate on a column common to both sides is pushed to both sides #[test] fn filter_using_right_join_on_common() -> Result<()> { let table_scan = test_table_scan()?; @@ -2858,12 +2856,12 @@ mod tests { TableScan: test2 ", ); - // filter sent to right side of join, not duplicated to the left + // filter sent to right side of join, sent to the left as well assert_optimized_plan_equal!( plan, @r" Right Join: Using test.a = test2.a - TableScan: test + TableScan: test, full_filters=[test.a <= Int64(1)] Projection: test2.a TableScan: test2, full_filters=[test2.a <= Int64(1)] " @@ -3045,7 +3043,7 @@ mod tests { Projection: test.a, test.b, test.c TableScan: test Projection: test2.a, test2.b, test2.c - TableScan: test2, full_filters=[test2.c > UInt32(4)] + TableScan: test2, full_filters=[test2.a > UInt32(1), test2.c > UInt32(4)] " ) } diff --git a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt index a1efc1317b4aa..f8bec6ffdf0a1 100644 --- a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt +++ b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt @@ -89,20 +89,22 @@ logical_plan 02)--Projection: t2.a AS a2, t2.b 03)----RightSemi Join: t1.d = t2.d, t1.c = t2.c 04)------SubqueryAlias: t1 -05)--------TableScan: annotated_data projection=[c, d] -06)------SubqueryAlias: t2 -07)--------Filter: annotated_data.d = Int32(3) -08)----------TableScan: annotated_data projection=[a, b, c, d], partial_filters=[annotated_data.d = Int32(3)] +05)--------Filter: annotated_data.d = Int32(3) +06)----------TableScan: annotated_data projection=[c, d], partial_filters=[annotated_data.d = Int32(3)] +07)------SubqueryAlias: t2 +08)--------Filter: annotated_data.d = Int32(3) +09)----------TableScan: annotated_data projection=[a, b, c, d], partial_filters=[annotated_data.d = Int32(3)] physical_plan 01)SortPreservingMergeExec: [a2@0 ASC NULLS LAST, b@1 ASC NULLS LAST], fetch=10 02)--ProjectionExec: expr=[a@0 as a2, b@1 as b] -03)----CoalesceBatchesExec: target_batch_size=8192, fetch=10 -04)------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)], projection=[a@0, b@1] -05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], file_type=csv, has_header=true -06)--------CoalesceBatchesExec: target_batch_size=8192 -07)----------FilterExec: d@3 = 3 -08)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -09)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=csv, has_header=true +03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)], projection=[a@0, b@1] +04)------CoalescePartitionsExec +05)--------FilterExec: d@1 = 3 +06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], file_type=csv, has_header=true +08)------FilterExec: d@3 = 3 +09)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +10)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=csv, has_header=true # preserve_right_semi_join query II nosort diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 4bdf2e5da9632..35bd551a63caf 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -2106,7 +2106,7 @@ SELECT join_t1.t1_id, join_t2.t2_id FROM (select t1_id from join_t1 where join_t1.t1_id > 22) as join_t1 RIGHT JOIN (select t2_id from join_t2 where join_t2.t2_id > 11) as join_t2 ON join_t1.t1_id < join_t2.t2_id -ORDER BY 1, 2 +ORDER BY 1, 2 ---- 33 44 33 55 @@ -4025,7 +4025,7 @@ query TT explain select t1_id, t1_name, i from join_t1 t1 cross join lateral (select * from unnest(generate_series(1, t1_int))) as series(i); ---- logical_plan -01)Cross Join: +01)Cross Join: 02)--SubqueryAlias: t1 03)----TableScan: join_t1 projection=[t1_id, t1_name] 04)--SubqueryAlias: series @@ -4617,7 +4617,7 @@ query TT explain SELECT * FROM person a NATURAL JOIN lineitem b; ---- logical_plan -01)Cross Join: +01)Cross Join: 02)--SubqueryAlias: a 03)----TableScan: person projection=[id, age, state] 04)--SubqueryAlias: b @@ -4664,7 +4664,7 @@ query TT explain SELECT j1_string, j2_string FROM j1, LATERAL (SELECT * FROM j2 WHERE j1_id < j2_id) AS j2; ---- logical_plan -01)Cross Join: +01)Cross Join: 02)--TableScan: j1 projection=[j1_string] 03)--SubqueryAlias: j2 04)----Projection: j2.j2_string @@ -4677,7 +4677,7 @@ query TT explain SELECT * FROM j1 JOIN (j2 JOIN j3 ON(j2_id = j3_id - 2)) ON(j1_id = j2_id), LATERAL (SELECT * FROM j3 WHERE j3_string = j2_string) as j4 ---- logical_plan -01)Cross Join: +01)Cross Join: 02)--Inner Join: CAST(j2.j2_id AS Int64) = CAST(j3.j3_id AS Int64) - Int64(2) 03)----Inner Join: j1.j1_id = j2.j2_id 04)------TableScan: j1 projection=[j1_string, j1_id] @@ -4693,11 +4693,11 @@ query TT explain SELECT * FROM j1, LATERAL (SELECT * FROM j1, LATERAL (SELECT * FROM j2 WHERE j1_id = j2_id) as j2) as j2; ---- logical_plan -01)Cross Join: +01)Cross Join: 02)--TableScan: j1 projection=[j1_string, j1_id] 03)--SubqueryAlias: j2 04)----Subquery: -05)------Cross Join: +05)------Cross Join: 06)--------TableScan: j1 projection=[j1_string, j1_id] 07)--------SubqueryAlias: j2 08)----------Subquery: @@ -4709,7 +4709,7 @@ query TT explain SELECT j1_string, j2_string FROM j1 LEFT JOIN LATERAL (SELECT * FROM j2 WHERE j1_id < j2_id) AS j2 ON(true); ---- logical_plan -01)Left Join: +01)Left Join: 02)--TableScan: j1 projection=[j1_string] 03)--SubqueryAlias: j2 04)----Projection: j2.j2_string @@ -4722,9 +4722,9 @@ query TT explain SELECT * FROM j1, (j2 LEFT JOIN LATERAL (SELECT * FROM j3 WHERE j1_id + j2_id = j3_id) AS j3 ON(true)); ---- logical_plan -01)Cross Join: +01)Cross Join: 02)--TableScan: j1 projection=[j1_string, j1_id] -03)--Left Join: +03)--Left Join: 04)----TableScan: j2 projection=[j2_string, j2_id] 05)----SubqueryAlias: j3 06)------Subquery: @@ -4736,7 +4736,7 @@ query TT explain SELECT * FROM j1, LATERAL (SELECT 1) AS j2; ---- logical_plan -01)Cross Join: +01)Cross Join: 02)--TableScan: j1 projection=[j1_string, j1_id] 03)--SubqueryAlias: j2 04)----Projection: Int64(1) @@ -5089,7 +5089,7 @@ FULL JOIN t2 ON k1 = k2 # LEFT MARK JOIN query TT -EXPLAIN +EXPLAIN SELECT * FROM t2 WHERE k2 > 0 @@ -5148,12 +5148,11 @@ LEFT ANTI JOIN t2 ON k1 = k2 WHERE k1 < 0 ---- physical_plan -01)CoalesceBatchesExec: target_batch_size=3 -02)--HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k2@0, k1@0)] +01)HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k2@0, k1@0)] +02)--FilterExec: k2@0 < 0 03)----DataSourceExec: partitions=1, partition_sizes=[0] -04)----CoalesceBatchesExec: target_batch_size=3 -05)------FilterExec: k1@0 < 0 -06)--------DataSourceExec: partitions=1, partition_sizes=[10000] +04)--FilterExec: k1@0 < 0 +05)----DataSourceExec: partitions=1, partition_sizes=[10000] query II SELECT * @@ -5168,14 +5167,14 @@ CREATE OR REPLACE TABLE t1(b INT, c INT, d INT); statement ok INSERT INTO t1 VALUES - (10, 5, 3), - ( 1, 7, 8), - ( 2, 9, 7), - ( 3, 8,10), - ( 5, 6, 6), - ( 0, 4, 9), - ( 4, 8, 7), - (100,6, 5); + (10, 5, 3), + ( 1, 7, 8), + ( 2, 9, 7), + ( 3, 8,10), + ( 5, 6, 6), + ( 0, 4, 9), + ( 4, 8, 7), + (100,6, 5); query I rowsort SELECT c diff --git a/datafusion/sqllogictest/test_files/push_down_filter.slt b/datafusion/sqllogictest/test_files/push_down_filter.slt index 47095d92d9376..3bfc9f6cf18aa 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter.slt @@ -271,7 +271,7 @@ physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/ query TT explain select a from t where a = ''; ---- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = # The predicate should not have a column cast when the operator is = or != and the literal can be round-trip casted without losing information. query TT @@ -420,3 +420,333 @@ FROM ( WHERE t.start_timestamp::time < '00:00:01'::time; ---- 1 + +# Test aggregate dynamic filter pushdown +# Note: most of the test coverage lives in `datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs` +# , to compare dynamic filter content easier. Here the tests are simple end-to-end +# exercises. + +statement ok +set datafusion.explain.format = 'indent'; + +statement ok +set datafusion.explain.physical_plan_only = true; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.parquet.pushdown_filters = true; + +statement ok +set datafusion.optimizer.enable_dynamic_filter_pushdown = true; + +statement ok +set datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = true; + +statement ok +create external table agg_dyn_test stored as parquet location '../core/tests/data/test_statistics_per_partition'; + +# Expect dynamic filter available inside data source +query TT +explain select max(id) from agg_dyn_test where id > 1; +---- +physical_plan +01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)] +02)--CoalescePartitionsExec +03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)] +04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 > 1 AND DynamicFilter [ empty ], pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[] + +query I +select max(id) from agg_dyn_test where id > 1; +---- +4 + +# Expect dynamic filter available inside data source +query TT +explain select max(id) from agg_dyn_test where (id+1) > 1; +---- +physical_plan +01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)] +02)--CoalescePartitionsExec +03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)] +04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=CAST(id@0 AS Int64) + 1 > 1 AND DynamicFilter [ empty ] + +# Expect dynamic filter available inside data source +query TT +explain select max(id), min(id) from agg_dyn_test where id < 10; +---- +physical_plan +01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id), min(agg_dyn_test.id)] +02)--CoalescePartitionsExec +03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id), min(agg_dyn_test.id)] +04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 < 10 AND DynamicFilter [ empty ], pruning_predicate=id_null_count@1 != row_count@2 AND id_min@0 < 10, required_guarantees=[] + +# Dynamic filter should not be available for grouping sets +query TT +explain select max(id) from agg_dyn_test where id < 10 +group by grouping sets ((), (id)) +---- +physical_plan +01)ProjectionExec: expr=[max(agg_dyn_test.id)@2 as max(agg_dyn_test.id)] +02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, __grouping_id@1 as __grouping_id], aggr=[max(agg_dyn_test.id)] +03)----RepartitionExec: partitioning=Hash([id@0, __grouping_id@1], 2), input_partitions=2 +04)------AggregateExec: mode=Partial, gby=[(NULL as id), (id@0 as id)], aggr=[max(agg_dyn_test.id)] +05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 < 10, pruning_predicate=id_null_count@1 != row_count@2 AND id_min@0 < 10, required_guarantees=[] + +statement ok +drop table agg_dyn_test; + +statement ok +drop table t1; + +statement ok +drop table t2; + + + +# check LEFT/RIGHT joins with filter pushdown to both relations (when possible) + +statement ok +create table t1(k int, v int); + +statement ok +create table t2(k int, v int); + +statement ok +insert into t1 values + (1, 10), + (2, 20), + (3, 30), + (null, 40), + (50, null), + (null, null); + +statement ok +insert into t2 values + (1, 11), + (2, 21), + (2, 22), + (null, 41), + (51, null), + (null, null); + +statement ok +set datafusion.explain.physical_plan_only = false; + +statement ok +set datafusion.explain.logical_plan_only = true; + + +# left join + filter on join key -> pushed +query TT +explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 1; +---- +logical_plan +01)Left Join: t1.k = t2.k +02)--Filter: t1.k > Int32(1) +03)----TableScan: t1 projection=[k, v] +04)--Filter: t2.k > Int32(1) +05)----TableScan: t2 projection=[k, v] + +query IIII rowsort +select * from t1 left join t2 on t1.k = t2.k where t1.k > 1; +---- +2 20 2 21 +2 20 2 22 +3 30 NULL NULL +50 NULL NULL NULL + +# left join + filter on another column -> not pushed +query TT +explain select * from t1 left join t2 on t1.k = t2.k where t1.v > 1; +---- +logical_plan +01)Left Join: t1.k = t2.k +02)--Filter: t1.v > Int32(1) +03)----TableScan: t1 projection=[k, v] +04)--TableScan: t2 projection=[k, v] + +query IIII rowsort +select * from t1 left join t2 on t1.k = t2.k where t1.v > 1; +---- +1 10 1 11 +2 20 2 21 +2 20 2 22 +3 30 NULL NULL +NULL 40 NULL NULL + +# left join + or + filter on another column -> not pushed +query TT +explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; +---- +logical_plan +01)Left Join: t1.k = t2.k +02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20) +03)----TableScan: t1 projection=[k, v] +04)--TableScan: t2 projection=[k, v] + +query IIII rowsort +select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; +---- +3 30 NULL NULL +50 NULL NULL NULL +NULL 40 NULL NULL + + +# right join + filter on join key -> pushed +query TT +explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 1; +---- +logical_plan +01)Inner Join: t1.k = t2.k +02)--Filter: t1.k > Int32(1) +03)----TableScan: t1 projection=[k, v] +04)--Filter: t2.k > Int32(1) +05)----TableScan: t2 projection=[k, v] + +query IIII rowsort +select * from t1 right join t2 on t1.k = t2.k where t1.k > 1; +---- +2 20 2 21 +2 20 2 22 + +# right join + filter on another column -> not pushed +query TT +explain select * from t1 right join t2 on t1.k = t2.k where t1.v > 1; +---- +logical_plan +01)Inner Join: t1.k = t2.k +02)--Filter: t1.v > Int32(1) +03)----TableScan: t1 projection=[k, v] +04)--TableScan: t2 projection=[k, v] + +query IIII rowsort +select * from t1 right join t2 on t1.k = t2.k where t1.v > 1; +---- +1 10 1 11 +2 20 2 21 +2 20 2 22 + +# right join + or + filter on another column -> not pushed +query TT +explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; +---- +logical_plan +01)Inner Join: t1.k = t2.k +02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20) +03)----TableScan: t1 projection=[k, v] +04)--TableScan: t2 projection=[k, v] + +query IIII rowsort +select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; +---- + + +# left anti join + filter on join key -> pushed +query TT +explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1; +---- +logical_plan +01)LeftAnti Join: t1.k = t2.k +02)--Filter: t1.k > Int32(1) +03)----TableScan: t1 projection=[k, v] +04)--Filter: t2.k > Int32(1) +05)----TableScan: t2 projection=[k] + +query II rowsort +select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1; +---- +3 30 +50 NULL + +# left anti join + filter on another column -> not pushed +query TT +explain select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1; +---- +logical_plan +01)LeftAnti Join: t1.k = t2.k +02)--Filter: t1.v > Int32(1) +03)----TableScan: t1 projection=[k, v] +04)--TableScan: t2 projection=[k] + +query II rowsort +select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1; +---- +3 30 +NULL 40 + +# left anti join + or + filter on another column -> not pushed +query TT +explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; +---- +logical_plan +01)LeftAnti Join: t1.k = t2.k +02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20) +03)----TableScan: t1 projection=[k, v] +04)--TableScan: t2 projection=[k] + +query II rowsort +select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; +---- +3 30 +50 NULL +NULL 40 + + +# right anti join + filter on join key -> pushed +query TT +explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1; +---- +logical_plan +01)RightAnti Join: t1.k = t2.k +02)--Filter: t1.k > Int32(1) +03)----TableScan: t1 projection=[k] +04)--Filter: t2.k > Int32(1) +05)----TableScan: t2 projection=[k, v] + +query II rowsort +select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1; +---- +51 NULL + +# right anti join + filter on another column -> not pushed +query TT +explain select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1; +---- +logical_plan +01)RightAnti Join: t1.k = t2.k +02)--TableScan: t1 projection=[k] +03)--Filter: t2.v > Int32(1) +04)----TableScan: t2 projection=[k, v] + +query II rowsort +select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1; +---- +NULL 41 + +# right anti join + or + filter on another column -> not pushed +query TT +explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or t2.v > 20; +---- +logical_plan +01)RightAnti Join: t1.k = t2.k +02)--TableScan: t1 projection=[k] +03)--Filter: t2.k > Int32(3) OR t2.v > Int32(20) +04)----TableScan: t2 projection=[k, v] + +query II rowsort +select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or t2.v > 20; +---- +51 NULL +NULL 41 + + +statement ok +set datafusion.explain.logical_plan_only = false; + +statement ok +drop table t1; + +statement ok +drop table t2;