Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,11 +452,11 @@ fn push_down_all_join(
}
}

// For infer predicates, if they can not push through join, just drop them
// Push predicates inferred from the join expression
for predicate in inferred_join_predicates {
if left_preserved && checker.is_left_only(&predicate) {
if checker.is_left_only(&predicate) {
left_push.push(predicate);
} else if right_preserved && checker.is_right_only(&predicate) {
} else if checker.is_right_only(&predicate) {
right_push.push(predicate);
}
}
Expand Down Expand Up @@ -2788,8 +2788,7 @@ mod tests {
)
}

/// post-left-join predicate on a column common to both sides is only pushed to the left side
/// i.e. - not duplicated to the right side
/// post-left-join predicate on a column common to both sides is pushed to both sides
#[test]
fn filter_using_left_join_on_common() -> Result<()> {
let table_scan = test_table_scan()?;
Expand Down Expand Up @@ -2817,20 +2816,19 @@ mod tests {
TableScan: test2
",
);
// filter sent to left side of the join, not the right
// filter sent to left side of the join and to the right
assert_optimized_plan_equal!(
plan,
@r"
Left Join: Using test.a = test2.a
TableScan: test, full_filters=[test.a <= Int64(1)]
Projection: test2.a
TableScan: test2
TableScan: test2, full_filters=[test2.a <= Int64(1)]
"
)
}

/// post-right-join predicate on a column common to both sides is only pushed to the right side
/// i.e. - not duplicated to the left side.
/// post-right-join predicate on a column common to both sides is pushed to both sides
#[test]
fn filter_using_right_join_on_common() -> Result<()> {
let table_scan = test_table_scan()?;
Expand Down Expand Up @@ -2858,12 +2856,12 @@ mod tests {
TableScan: test2
",
);
// filter sent to right side of join, not duplicated to the left
// filter sent to right side of join, sent to the left as well
assert_optimized_plan_equal!(
plan,
@r"
Right Join: Using test.a = test2.a
TableScan: test
TableScan: test, full_filters=[test.a <= Int64(1)]
Projection: test2.a
TableScan: test2, full_filters=[test2.a <= Int64(1)]
"
Expand Down Expand Up @@ -3045,7 +3043,7 @@ mod tests {
Projection: test.a, test.b, test.c
TableScan: test
Projection: test2.a, test2.b, test2.c
TableScan: test2, full_filters=[test2.c > UInt32(4)]
TableScan: test2, full_filters=[test2.a > UInt32(1), test2.c > UInt32(4)]
"
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,22 @@ logical_plan
02)--Projection: t2.a AS a2, t2.b
03)----RightSemi Join: t1.d = t2.d, t1.c = t2.c
04)------SubqueryAlias: t1
05)--------TableScan: annotated_data projection=[c, d]
06)------SubqueryAlias: t2
07)--------Filter: annotated_data.d = Int32(3)
08)----------TableScan: annotated_data projection=[a, b, c, d], partial_filters=[annotated_data.d = Int32(3)]
05)--------Filter: annotated_data.d = Int32(3)
06)----------TableScan: annotated_data projection=[c, d], partial_filters=[annotated_data.d = Int32(3)]
07)------SubqueryAlias: t2
08)--------Filter: annotated_data.d = Int32(3)
09)----------TableScan: annotated_data projection=[a, b, c, d], partial_filters=[annotated_data.d = Int32(3)]
physical_plan
01)SortPreservingMergeExec: [a2@0 ASC NULLS LAST, b@1 ASC NULLS LAST], fetch=10
02)--ProjectionExec: expr=[a@0 as a2, b@1 as b]
03)----CoalesceBatchesExec: target_batch_size=8192, fetch=10
04)------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)], projection=[a@0, b@1]
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], file_type=csv, has_header=true
06)--------CoalesceBatchesExec: target_batch_size=8192
07)----------FilterExec: d@3 = 3
08)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=csv, has_header=true
03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)], projection=[a@0, b@1]
04)------CoalescePartitionsExec
05)--------FilterExec: d@1 = 3
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], file_type=csv, has_header=true
08)------FilterExec: d@3 = 3
09)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
10)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=csv, has_header=true

# preserve_right_semi_join
query II nosort
Expand Down
49 changes: 24 additions & 25 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2106,7 +2106,7 @@ SELECT join_t1.t1_id, join_t2.t2_id
FROM (select t1_id from join_t1 where join_t1.t1_id > 22) as join_t1
RIGHT JOIN (select t2_id from join_t2 where join_t2.t2_id > 11) as join_t2
ON join_t1.t1_id < join_t2.t2_id
ORDER BY 1, 2
ORDER BY 1, 2
----
33 44
33 55
Expand Down Expand Up @@ -4025,7 +4025,7 @@ query TT
explain select t1_id, t1_name, i from join_t1 t1 cross join lateral (select * from unnest(generate_series(1, t1_int))) as series(i);
----
logical_plan
01)Cross Join:
01)Cross Join:
02)--SubqueryAlias: t1
03)----TableScan: join_t1 projection=[t1_id, t1_name]
04)--SubqueryAlias: series
Expand Down Expand Up @@ -4617,7 +4617,7 @@ query TT
explain SELECT * FROM person a NATURAL JOIN lineitem b;
----
logical_plan
01)Cross Join:
01)Cross Join:
02)--SubqueryAlias: a
03)----TableScan: person projection=[id, age, state]
04)--SubqueryAlias: b
Expand Down Expand Up @@ -4664,7 +4664,7 @@ query TT
explain SELECT j1_string, j2_string FROM j1, LATERAL (SELECT * FROM j2 WHERE j1_id < j2_id) AS j2;
----
logical_plan
01)Cross Join:
01)Cross Join:
02)--TableScan: j1 projection=[j1_string]
03)--SubqueryAlias: j2
04)----Projection: j2.j2_string
Expand All @@ -4677,7 +4677,7 @@ query TT
explain SELECT * FROM j1 JOIN (j2 JOIN j3 ON(j2_id = j3_id - 2)) ON(j1_id = j2_id), LATERAL (SELECT * FROM j3 WHERE j3_string = j2_string) as j4
----
logical_plan
01)Cross Join:
01)Cross Join:
02)--Inner Join: CAST(j2.j2_id AS Int64) = CAST(j3.j3_id AS Int64) - Int64(2)
03)----Inner Join: j1.j1_id = j2.j2_id
04)------TableScan: j1 projection=[j1_string, j1_id]
Expand All @@ -4693,11 +4693,11 @@ query TT
explain SELECT * FROM j1, LATERAL (SELECT * FROM j1, LATERAL (SELECT * FROM j2 WHERE j1_id = j2_id) as j2) as j2;
----
logical_plan
01)Cross Join:
01)Cross Join:
02)--TableScan: j1 projection=[j1_string, j1_id]
03)--SubqueryAlias: j2
04)----Subquery:
05)------Cross Join:
05)------Cross Join:
06)--------TableScan: j1 projection=[j1_string, j1_id]
07)--------SubqueryAlias: j2
08)----------Subquery:
Expand All @@ -4709,7 +4709,7 @@ query TT
explain SELECT j1_string, j2_string FROM j1 LEFT JOIN LATERAL (SELECT * FROM j2 WHERE j1_id < j2_id) AS j2 ON(true);
----
logical_plan
01)Left Join:
01)Left Join:
02)--TableScan: j1 projection=[j1_string]
03)--SubqueryAlias: j2
04)----Projection: j2.j2_string
Expand All @@ -4722,9 +4722,9 @@ query TT
explain SELECT * FROM j1, (j2 LEFT JOIN LATERAL (SELECT * FROM j3 WHERE j1_id + j2_id = j3_id) AS j3 ON(true));
----
logical_plan
01)Cross Join:
01)Cross Join:
02)--TableScan: j1 projection=[j1_string, j1_id]
03)--Left Join:
03)--Left Join:
04)----TableScan: j2 projection=[j2_string, j2_id]
05)----SubqueryAlias: j3
06)------Subquery:
Expand All @@ -4736,7 +4736,7 @@ query TT
explain SELECT * FROM j1, LATERAL (SELECT 1) AS j2;
----
logical_plan
01)Cross Join:
01)Cross Join:
02)--TableScan: j1 projection=[j1_string, j1_id]
03)--SubqueryAlias: j2
04)----Projection: Int64(1)
Expand Down Expand Up @@ -5089,7 +5089,7 @@ FULL JOIN t2 ON k1 = k2

# LEFT MARK JOIN
query TT
EXPLAIN
EXPLAIN
SELECT *
FROM t2
WHERE k2 > 0
Expand Down Expand Up @@ -5148,12 +5148,11 @@ LEFT ANTI JOIN t2 ON k1 = k2
WHERE k1 < 0
----
physical_plan
01)CoalesceBatchesExec: target_batch_size=3
02)--HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k2@0, k1@0)]
01)HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k2@0, k1@0)]
02)--FilterExec: k2@0 < 0
03)----DataSourceExec: partitions=1, partition_sizes=[0]
04)----CoalesceBatchesExec: target_batch_size=3
05)------FilterExec: k1@0 < 0
06)--------DataSourceExec: partitions=1, partition_sizes=[10000]
04)--FilterExec: k1@0 < 0
05)----DataSourceExec: partitions=1, partition_sizes=[10000]

query II
SELECT *
Expand All @@ -5168,14 +5167,14 @@ CREATE OR REPLACE TABLE t1(b INT, c INT, d INT);

statement ok
INSERT INTO t1 VALUES
(10, 5, 3),
( 1, 7, 8),
( 2, 9, 7),
( 3, 8,10),
( 5, 6, 6),
( 0, 4, 9),
( 4, 8, 7),
(100,6, 5);
(10, 5, 3),
( 1, 7, 8),
( 2, 9, 7),
( 3, 8,10),
( 5, 6, 6),
( 0, 4, 9),
( 4, 8, 7),
(100,6, 5);

query I rowsort
SELECT c
Expand Down
Loading
Loading