Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Unparse LogicalPlans with grouping aggregate functions #66

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ datafusion-functions = { workspace = true, default-features = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-nested = { workspace = true }
datafusion-functions-window = { workspace = true }
datafusion-optimizer = { workspace = true }
datafusion-common = { workspace = true }
env_logger = { workspace = true }
paste = "^1.0"
rstest = { workspace = true }
37 changes: 34 additions & 3 deletions datafusion/sql/src/unparser/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ use std::{cmp::Ordering, sync::Arc, vec};

use datafusion_common::{
internal_err,
tree_node::{Transformed, TransformedResult, TreeNode},
tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion},
Column, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::{
expr, utils::grouping_set_to_exprlist, Aggregate, Expr, LogicalPlan,
LogicalPlanBuilder, Projection, SortExpr, Unnest, Window,
expr::{self, Placeholder},
utils::grouping_set_to_exprlist,
Aggregate, Expr, LogicalPlan, LogicalPlanBuilder, Projection, SortExpr, Unnest,
Window,
};
use sqlparser::ast;

Expand Down Expand Up @@ -168,6 +170,20 @@ pub(crate) fn unproject_agg_exprs(
agg: &Aggregate,
windows: Option<&[&Window]>,
) -> Result<Expr> {
// If the current expression is an Alias over the internal grouping id column,
// we need to return a placeholder expression that represents the inverse
// of the replacement done in the [ResolveGroupingFunction] analyzer rule.
//
// [ResolveGroupingFunction]: datafusion_optimizer::resolve_grouping_function::ResolveGroupingFunction
if let Expr::Alias(alias) = &expr {
if find_grouping_id_col(&expr).is_some() {
return Ok(Expr::Placeholder(Placeholder::new(
alias.name.clone(),
None,
)));
}
}

expr.transform(|sub_expr| {
if let Expr::Column(c) = sub_expr {
if let Some(unprojected_expr) = find_agg_expr(agg, &c)? {
Expand Down Expand Up @@ -243,6 +259,21 @@ fn find_window_expr<'a>(
.find(|expr| expr.schema_name().to_string() == column_name)
}

/// Recursively searches for a Column expression with the name of the internal grouping id column: `__grouping_id`.
fn find_grouping_id_col(expr: &Expr) -> Option<&Expr> {
let mut grouping_id_col: Option<&Expr> = None;
expr.apply(|sub_expr| {
if let Expr::Column(c) = sub_expr {
if c.name == Aggregate::INTERNAL_GROUPING_ID {
grouping_id_col = Some(sub_expr);
}
}
Ok(TreeNodeRecursion::Continue)
})
.ok()?;
grouping_id_col
}

/// Transforms a Column expression into the actual expression from aggregation or projection if found.
/// This is required because if an ORDER BY expression is present in an Aggregate or Select, it is replaced
/// with a Column expression (e.g., "sum(catalog_returns.cr_net_loss)"). We need to transform it back to
Expand Down
45 changes: 45 additions & 0 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ use std::sync::Arc;
use std::vec;

use arrow_schema::*;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DFSchema, Result, TableReference};
use datafusion_expr::test::function_stub::{count_udaf, max_udaf, min_udaf, sum_udaf};
use datafusion_expr::{col, lit, table_scan, wildcard, LogicalPlanBuilder};
use datafusion_functions::unicode;
use datafusion_functions_aggregate::grouping::grouping_udaf;
use datafusion_functions_nested::make_array::make_array_udf;
use datafusion_functions_window::rank::rank_udwf;
use datafusion_optimizer::analyzer::resolve_grouping_function::ResolveGroupingFunction;
use datafusion_optimizer::AnalyzerRule;
use datafusion_sql::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_sql::unparser::dialect::{
DefaultDialect as UnparserDefaultDialect, Dialect as UnparserDialect,
Expand Down Expand Up @@ -796,6 +799,38 @@ where
assert_eq!(roundtrip_statement.to_string(), expect);
}

fn sql_round_trip_with_analyzer<D>(
dialect: D,
query: &str,
expect: &str,
analyzer: &dyn AnalyzerRule,
) where
D: Dialect,
{
let statement = Parser::new(&dialect)
.try_with_sql(query)
.unwrap()
.parse_statement()
.unwrap();

let context = MockContextProvider {
state: MockSessionState::default()
.with_aggregate_function(sum_udaf())
.with_aggregate_function(max_udaf())
.with_aggregate_function(grouping_udaf())
.with_window_function(rank_udwf())
.with_scalar_function(Arc::new(unicode::substr().as_ref().clone()))
.with_scalar_function(make_array_udf()),
};
let sql_to_rel = SqlToRel::new(&context);
let plan = sql_to_rel.sql_statement_to_plan(statement).unwrap();

let plan = analyzer.analyze(plan, &ConfigOptions::default()).unwrap();

let roundtrip_statement = plan_to_sql(&plan).unwrap();
assert_eq!(roundtrip_statement.to_string(), expect);
}

#[test]
fn test_table_scan_alias() -> Result<()> {
let schema = Schema::new(vec![
Expand Down Expand Up @@ -1276,6 +1311,16 @@ GROUP BY person.id, person.first_name"#.replace("\n", " ").as_str(),
);
}

#[test]
fn test_grouping_aggregate_function_to_sql() {
sql_round_trip_with_analyzer(
GenericDialect {},
r#"SELECT id, first_name, grouping(id) FROM person GROUP BY ROLLUP(id, first_name)"#,
r#"SELECT id, first_name, "grouping(person.id)" FROM (SELECT person.id, person.first_name, grouping(person.id) FROM person GROUP BY ROLLUP (person.id, person.first_name))"#,
&ResolveGroupingFunction,
);
}

#[test]
fn test_unnest_to_sql() {
sql_round_trip(
Expand Down