Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Wrap TableScan with Filter in Join Unparsing #13496

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 9 additions & 49 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use datafusion_common::{
Column, DataFusionError, Result, TableReference,
};
use datafusion_expr::{
expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan,
expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
LogicalPlanBuilder, Projection, SortExpr, TableScan,
};
use sqlparser::ast::{self, Ident, SetExpr};
use std::sync::Arc;
Expand Down Expand Up @@ -487,14 +487,9 @@ impl Unparser<'_> {
self.select_to_sql_recursively(input, query, select, relation)
}
LogicalPlan::Join(join) => {
let mut table_scan_filters = vec![];

let left_plan =
match try_transform_to_simple_table_scan_with_filters(&join.left)? {
Some((plan, filters)) => {
table_scan_filters.extend(filters);
Arc::new(plan)
}
Some(plan) => Arc::new(plan),
None => Arc::clone(&join.left),
};

Expand All @@ -507,72 +502,37 @@ impl Unparser<'_> {

let right_plan =
match try_transform_to_simple_table_scan_with_filters(&join.right)? {
Some((plan, filters)) => {
table_scan_filters.extend(filters);
Arc::new(plan)
}
Some(plan) => Arc::new(plan),
None => Arc::clone(&join.right),
};

let mut right_relation = RelationBuilder::default();

self.select_to_sql_recursively(
right_plan.as_ref(),
query,
select,
&mut right_relation,
)?;

let join_filters = if table_scan_filters.is_empty() {
join.filter.clone()
} else {
// Combine `table_scan_filters` into a single filter using `AND`
let Some(combined_filters) =
table_scan_filters.into_iter().reduce(|acc, filter| {
Expr::BinaryExpr(BinaryExpr {
left: Box::new(acc),
op: Operator::And,
right: Box::new(filter),
})
})
else {
return internal_err!("Failed to combine TableScan filters");
};

// Combine `join.filter` with `combined_filters` using `AND`
match &join.filter {
Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
left: Box::new(filter.clone()),
op: Operator::And,
right: Box::new(combined_filters),
})),
None => Some(combined_filters),
}
let Ok(Some(relation)) = right_relation.build() else {
return internal_err!("Failed to build right relation");
};

let join_filters = join.filter.clone();

let join_constraint = self.join_constraint_to_sql(
join.join_constraint,
&join.on,
join_filters.as_ref(),
)?;

self.select_to_sql_recursively(
right_plan.as_ref(),
query,
select,
&mut right_relation,
)?;

let Ok(Some(relation)) = right_relation.build() else {
return internal_err!("Failed to build right relation");
};

let ast_join = ast::Join {
relation,
global: false,
join_operator: self
.join_operator_to_sql(join.join_type, join_constraint)?,
};

let mut from = select.pop_from().unwrap();
from.push_join(ast_join);
select.push_from(from);
Expand Down
30 changes: 23 additions & 7 deletions datafusion/sql/src/unparser/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,17 @@
// specific language governing permissions and limitations
// under the License.

use std::{cmp::Ordering, sync::Arc, vec};

use datafusion_common::{
internal_err,
tree_node::{Transformed, TransformedResult, TreeNode},
Column, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::{
expr, utils::grouping_set_to_exprlist, Aggregate, Expr, LogicalPlan,
LogicalPlanBuilder, Projection, SortExpr, Unnest, Window,
expr, utils::grouping_set_to_exprlist, Aggregate, BinaryExpr, Expr, LogicalPlan,
LogicalPlanBuilder, Operator, Projection, SortExpr, Unnest, Window,
};
use sqlparser::ast;
use std::{cmp::Ordering, sync::Arc, vec};

use super::{
dialect::CharacterLengthStyle, dialect::DateFieldExtractStyle,
Expand Down Expand Up @@ -309,7 +308,7 @@ pub(crate) fn unproject_sort_expr(
/// And filters: [ta.j1_id < 5, ta.j1_id > 10]
pub(crate) fn try_transform_to_simple_table_scan_with_filters(
plan: &LogicalPlan,
) -> Result<Option<(LogicalPlan, Vec<Expr>)>> {
) -> Result<Option<LogicalPlan>> {
let mut filters: Vec<Expr> = vec![];
let mut plan_stack = vec![plan];
let mut table_alias = None;
Expand Down Expand Up @@ -349,6 +348,14 @@ pub(crate) fn try_transform_to_simple_table_scan_with_filters(

filters.extend(table_scan_filters);

let combined_filters = filters.into_iter().reduce(|acc, filter| {
Expr::BinaryExpr(BinaryExpr {
left: Box::new(acc),
op: Operator::And,
right: Box::new(filter),
})
});

let mut builder = LogicalPlanBuilder::scan(
table_scan.table_name.clone(),
Arc::clone(&table_scan.source),
Expand All @@ -359,9 +366,18 @@ pub(crate) fn try_transform_to_simple_table_scan_with_filters(
builder = builder.alias(alias)?;
}

let plan = builder.build()?;
let table_scan_plan = builder.build()?;

// If there are combined filters, wrap the TableScan in a Filter
let plan = if let Some(filter_expr) = combined_filters {
LogicalPlanBuilder::from(table_scan_plan)
.filter(filter_expr)?
.build()?
} else {
table_scan_plan
};

return Ok(Some((plan, filters)));
return Ok(Some(plan));
}
_ => {
return Ok(None);
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,7 @@ fn test_join_with_table_scan_filters() -> Result<()> {

let sql = plan_to_sql(&join_plan_with_filter)?;

let expected_sql = r#"SELECT * FROM left_table AS "left" JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND ("left"."name" LIKE 'some_name' AND (age > 10)))"#;
let expected_sql = r#"SELECT * FROM left_table AS "left" JOIN right_table ON "left".id = right_table.id AND ("left".id > 5) WHERE "left"."name" LIKE 'some_name' AND (right_table.age > 10)"#;

assert_eq!(sql.to_string(), expected_sql);

Expand All @@ -1128,7 +1128,7 @@ fn test_join_with_table_scan_filters() -> Result<()> {

let sql = plan_to_sql(&join_plan_no_filter)?;

let expected_sql = r#"SELECT * FROM left_table AS "left" JOIN right_table ON "left".id = right_table.id AND ("left"."name" LIKE 'some_name' AND (age > 10))"#;
let expected_sql = r#"SELECT * FROM left_table AS "left" JOIN right_table ON "left".id = right_table.id WHERE "left"."name" LIKE 'some_name' AND (right_table.age > 10)"#;

assert_eq!(sql.to_string(), expected_sql);

Expand All @@ -1153,7 +1153,7 @@ fn test_join_with_table_scan_filters() -> Result<()> {

let sql = plan_to_sql(&join_plan_multiple_filters)?;

let expected_sql = r#"SELECT * FROM left_table AS "left" JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND (("left"."name" LIKE 'some_name' AND (right_table."name" = 'before_join_filter_val')) AND (age > 10))) WHERE ("left"."name" = 'after_join_filter_val')"#;
let expected_sql = r#"SELECT * FROM left_table AS "left" JOIN right_table ON "left".id = right_table.id AND ("left".id > 5) WHERE ("left"."name" = 'after_join_filter_val') AND "left"."name" LIKE 'some_name' AND ((right_table."name" = 'before_join_filter_val') AND (right_table.age > 10))"#;
Copy link
Member

@sgrebnov sgrebnov Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm under impression that this change will result in incorrect behavior we were trying to fix here:
#13132.

Filtering after join is not the same as filtering during/before join (I would expect filters should be in subquery during join/ not after). Let me double check this please

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filtering after join is (semantically) the same as filtering before/during joins only for INNER JOIN / JOIN

You are correct that for LEFT/RIGHT/FULL OUTER JOIN and SEMI/ANTI JOIN the rules are more subtle


assert_eq!(sql.to_string(), expected_sql);

Expand Down