From 340681312dfc80c71b641fcbb81bc9b50a1a058c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philippe=20No=C3=ABl?= <21990816+philippemnoel@users.noreply.github.com> Date: Tue, 30 Jan 2024 12:12:43 -0500 Subject: [PATCH] chore: upgrade to DataFusion 35.0 (#2121) # Description This PR upgrades `delta-rs` to using DataFusion 35.0, which was recently released. In order to do this, I had to fix a few breaking changes, and also upgrade Arrow to 50 and `sqlparser` to 0.41. # Related Issue(s) N/A # Documentation See here for the list of PRs which required code change: - https://github.com/apache/arrow-datafusion/pull/8703 - https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/dev/changelog/35.0.0.md?plain=1#L227 --------- Co-authored-by: Ming Ying --- Cargo.toml | 39 ++++---- crates/azure/src/config.rs | 1 + crates/azure/src/error.rs | 1 + crates/core/Cargo.toml | 2 +- crates/core/src/delta_datafusion/mod.rs | 8 +- crates/core/src/kernel/snapshot/log_data.rs | 2 +- crates/core/src/operations/delete.rs | 1 - crates/core/src/operations/merge/barrier.rs | 4 +- crates/core/src/operations/merge/mod.rs | 6 +- crates/core/src/operations/optimize.rs | 46 +++++++--- .../core/src/operations/transaction/state.rs | 15 +++- crates/core/src/operations/update.rs | 10 +-- crates/gcp/src/error.rs | 1 + crates/sql/src/parser.rs | 88 +------------------ 14 files changed, 80 insertions(+), 144 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5d03cd562d..cfcb4eaf3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,28 +19,27 @@ debug = "line-tables-only" [workspace.dependencies] # arrow -arrow = { version = "49" } -arrow-arith = { version = "49" } -arrow-array = { version = "49" } -arrow-buffer = { version = "49" } -arrow-cast = { version = "49" } -arrow-ipc = { version = "49" } -arrow-json = { version = "49" } -arrow-ord = { version = "49" } -arrow-row = { version = "49" } -arrow-schema = { version = "49" } -arrow-select = { version = "49" } -object_store = { version = "0.8" } -parquet = { version = "49" } +arrow = { version = "50" } +arrow-arith = { version = "50" } +arrow-array = { version = "50" } +arrow-buffer = { version = "50" } +arrow-cast = { version = "50" } +arrow-ipc = { version = "50" } +arrow-json = { version = "50" } +arrow-ord = { version = "50" } +arrow-row = { version = "50" } +arrow-schema = { version = "50" } +arrow-select = { version = "50" } +object_store = { version = "0.9" } +parquet = { version = "50" } # datafusion -datafusion = { version = "34" } -datafusion-expr = { version = "34" } -datafusion-common = { version = "34" } -datafusion-proto = { version = "34" } -datafusion-sql = { version = "34" } -datafusion-physical-expr = { version = "34" } - +datafusion = { version = "35" } +datafusion-expr = { version = "35" } +datafusion-common = { version = "35" } +datafusion-proto = { version = "35" } +datafusion-sql = { version = "35" } +datafusion-physical-expr = { version = "35" } # serde serde = { version = "1.0.194", features = ["derive"] } diff --git a/crates/azure/src/config.rs b/crates/azure/src/config.rs index ccb06f171a..d30272768e 100644 --- a/crates/azure/src/config.rs +++ b/crates/azure/src/config.rs @@ -35,6 +35,7 @@ enum AzureCredential { /// Authorizing with secret ClientSecret, /// Using a shared access signature + #[allow(dead_code)] ManagedIdentity, /// Using a shared access signature SasKey, diff --git a/crates/azure/src/error.rs b/crates/azure/src/error.rs index aca1321c3d..acc18f67f9 100644 --- a/crates/azure/src/error.rs +++ b/crates/azure/src/error.rs @@ -4,6 +4,7 @@ pub(crate) type Result = std::result::Result; #[derive(thiserror::Error, Debug)] pub(crate) enum Error { + #[allow(dead_code)] #[error("failed to parse config: {0}")] Parse(String), diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index f8d3778eca..9773f82c46 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -97,7 +97,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [ "rustls-tls", "json", ], optional = true } -sqlparser = { version = "0.40", optional = true } +sqlparser = { version = "0.41", optional = true } [dev-dependencies] criterion = "0.5" diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index ca64c9ef63..d7a10edc1f 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -90,7 +90,7 @@ pub mod physical; impl From for DataFusionError { fn from(err: DeltaTableError) -> Self { match err { - DeltaTableError::Arrow { source } => DataFusionError::ArrowError(source), + DeltaTableError::Arrow { source } => DataFusionError::ArrowError(source, None), DeltaTableError::Io { source } => DataFusionError::IoError(source), DeltaTableError::ObjectStore { source } => DataFusionError::ObjectStore(source), DeltaTableError::Parquet { source } => DataFusionError::ParquetError(source), @@ -102,7 +102,7 @@ impl From for DataFusionError { impl From for DeltaTableError { fn from(err: DataFusionError) -> Self { match err { - DataFusionError::ArrowError(source) => DeltaTableError::Arrow { source }, + DataFusionError::ArrowError(source, _) => DeltaTableError::Arrow { source }, DataFusionError::IoError(source) => DeltaTableError::Io { source }, DataFusionError::ObjectStore(source) => DeltaTableError::ObjectStore { source }, DataFusionError::ParquetError(source) => DeltaTableError::Parquet { source }, @@ -430,7 +430,6 @@ impl<'a> DeltaScanBuilder<'a> { limit: self.limit, table_partition_cols, output_ordering: vec![], - infinite_source: false, }, logical_filter.as_ref(), ) @@ -808,7 +807,7 @@ pub(crate) fn logical_expr_to_physical_expr( ) -> Arc { let df_schema = schema.clone().to_dfschema().unwrap(); let execution_props = ExecutionProps::new(); - create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap() + create_physical_expr(expr, &df_schema, &execution_props).unwrap() } pub(crate) async fn execute_plan_to_batch( @@ -1238,7 +1237,6 @@ pub(crate) async fn find_files_scan<'a>( let predicate_expr = create_physical_expr( &Expr::IsTrue(Box::new(expression.clone())), &input_dfschema, - &input_schema, state.execution_props(), )?; diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 525f3db64b..b874b53421 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -245,7 +245,7 @@ impl LogicalFile<'_> { self.deletion_vector.as_ref().and_then(|arr| { arr.storage_type .is_valid(self.index) - .then(|| DeletionVectorView { + .then_some(DeletionVectorView { data: arr, index: self.index, }) diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 1e0f196aa3..2e3e99bde2 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -152,7 +152,6 @@ async fn excute_non_empty_expr( let predicate_expr = create_physical_expr( &negated_expression, &input_dfschema, - &input_schema, state.execution_props(), )?; let filter: Arc = diff --git a/crates/core/src/operations/merge/barrier.rs b/crates/core/src/operations/merge/barrier.rs index 6883f61253..f1df28c4a4 100644 --- a/crates/core/src/operations/merge/barrier.rs +++ b/crates/core/src/operations/merge/barrier.rs @@ -293,7 +293,9 @@ impl Stream for MergeBarrierStream { .iter() .map(|c| { arrow::compute::take(c.as_ref(), &indices, None) - .map_err(DataFusionError::ArrowError) + .map_err(|err| { + DataFusionError::ArrowError(err, None) + }) }) .collect::>>()?; diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index ffe2e78e38..07f65f4cf8 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -32,7 +32,6 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; -use arrow_schema::Schema as ArrowSchema; use async_trait::async_trait; use datafusion::datasource::provider_as_source; use datafusion::error::Result as DataFusionResult; @@ -657,11 +656,10 @@ impl ExtensionPlanner for MergeMetricExtensionPlanner { if let Some(barrier) = node.as_any().downcast_ref::() { let schema = barrier.input.schema(); - let exec_schema: ArrowSchema = schema.as_ref().to_owned().into(); return Ok(Some(Arc::new(MergeBarrierExec::new( physical_inputs.first().unwrap().clone(), barrier.file_column.clone(), - planner.create_physical_expr(&barrier.expr, schema, &exec_schema, session_state)?, + planner.create_physical_expr(&barrier.expr, schema, session_state)?, )))); } @@ -1418,9 +1416,7 @@ impl std::future::IntoFuture for MergeBuilder { PROTOCOL.can_write_to(&this.snapshot)?; let state = this.state.unwrap_or_else(|| { - //TODO: Datafusion's Hashjoin has some memory issues. Running with all cores results in a OoM. Can be removed when upstream improvemetns are made. let config: SessionConfig = DeltaSessionConfig::default().into(); - let config = config.with_target_partitions(1); let session = SessionContext::new_with_config(config); // If a user provides their own their DF state then they must register the store themselves diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 1fc0286754..c67b31a71b 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -552,7 +552,7 @@ impl MergePlan { use datafusion::prelude::{col, ParquetReadOptions}; use datafusion_common::Column; use datafusion_expr::expr::ScalarFunction; - use datafusion_expr::Expr; + use datafusion_expr::{Expr, ScalarUDF}; let locations = files .iter() @@ -578,7 +578,7 @@ impl MergePlan { .map(|col| Expr::Column(Column::from_qualified_name_ignore_case(col))) .collect_vec(); let expr = Expr::ScalarFunction(ScalarFunction::new_udf( - Arc::new(zorder::datafusion::zorder_key_udf()), + Arc::new(ScalarUDF::from(zorder::datafusion::ZOrderUDF)), cols, )); let df = df.with_column(ZORDER_KEY_COLUMN, expr)?; @@ -1139,10 +1139,10 @@ pub(super) mod zorder { use arrow_schema::DataType; use datafusion_common::DataFusionError; use datafusion_expr::{ - ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature, - TypeSignature, Volatility, + ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, Volatility, }; use itertools::Itertools; + use std::any::Any; pub const ZORDER_UDF_NAME: &str = "zorder_key"; @@ -1166,20 +1166,38 @@ pub(super) mod zorder { use url::Url; let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime); - ctx.register_udf(datafusion::zorder_key_udf()); + ctx.register_udf(ScalarUDF::from(datafusion::ZOrderUDF)); Ok(Self { columns, ctx }) } } - /// Get the DataFusion UDF struct for zorder_key - pub fn zorder_key_udf() -> ScalarUDF { - let signature = Signature { - type_signature: TypeSignature::VariadicAny, - volatility: Volatility::Immutable, - }; - let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Binary))); - let fun: ScalarFunctionImplementation = Arc::new(zorder_key_datafusion); - ScalarUDF::new(ZORDER_UDF_NAME, &signature, &return_type, &fun) + // DataFusion UDF impl for zorder_key + #[derive(Debug)] + pub struct ZOrderUDF; + + impl ScalarUDFImpl for ZOrderUDF { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + ZORDER_UDF_NAME + } + + fn signature(&self) -> &Signature { + &Signature { + type_signature: TypeSignature::VariadicAny, + volatility: Volatility::Immutable, + } + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Binary) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + zorder_key_datafusion(args) + } } /// Datafusion zorder UDF body diff --git a/crates/core/src/operations/transaction/state.rs b/crates/core/src/operations/transaction/state.rs index d3f680fcea..ab778f2cb6 100644 --- a/crates/core/src/operations/transaction/state.rs +++ b/crates/core/src/operations/transaction/state.rs @@ -1,6 +1,7 @@ +use std::collections::HashSet; use std::sync::Arc; -use arrow::array::ArrayRef; +use arrow::array::{ArrayRef, BooleanArray}; use arrow::datatypes::{ DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; @@ -296,6 +297,12 @@ impl<'a> PruningStatistics for AddContainer<'a> { }); ScalarValue::iter_to_array(values).ok() } + + // This function is required since DataFusion 35.0, but is implemented as a no-op + // https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550 + fn contained(&self, _column: &Column, _value: &HashSet) -> Option { + None + } } impl PruningStatistics for DeltaTableState { @@ -333,6 +340,12 @@ impl PruningStatistics for DeltaTableState { let container = AddContainer::new(&files, partition_columns, self.arrow_schema().ok()?); container.null_counts(column) } + + // This function is required since DataFusion 35.0, but is implemented as a no-op + // https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550 + fn contained(&self, _column: &Column, _value: &HashSet) -> Option { + None + } } #[cfg(test)] diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 582a37da28..d07f3f9fc0 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -263,12 +263,7 @@ async fn execute( let predicate_null = when(predicate.clone(), lit(true)).otherwise(lit(ScalarValue::Boolean(None)))?; - let predicate_expr = create_physical_expr( - &predicate_null, - &input_dfschema, - &input_schema, - execution_props, - )?; + let predicate_expr = create_physical_expr(&predicate_null, &input_dfschema, execution_props)?; expressions.push((predicate_expr, "__delta_rs_update_predicate".to_string())); let projection_predicate: Arc = @@ -315,8 +310,7 @@ async fn execute( let expr = case(col("__delta_rs_update_predicate")) .when(lit(true), expr.to_owned()) .otherwise(col(column.to_owned()))?; - let predicate_expr = - create_physical_expr(&expr, &input_dfschema, &input_schema, execution_props)?; + let predicate_expr = create_physical_expr(&expr, &input_dfschema, execution_props)?; map.insert(column.name.clone(), expressions.len()); let c = "__delta_rs_".to_string() + &column.name; expressions.push((predicate_expr, c.clone())); diff --git a/crates/gcp/src/error.rs b/crates/gcp/src/error.rs index aca1321c3d..acc18f67f9 100644 --- a/crates/gcp/src/error.rs +++ b/crates/gcp/src/error.rs @@ -4,6 +4,7 @@ pub(crate) type Result = std::result::Result; #[derive(thiserror::Error, Debug)] pub(crate) enum Error { + #[allow(dead_code)] #[error("failed to parse config: {0}")] Parse(String), diff --git a/crates/sql/src/parser.rs b/crates/sql/src/parser.rs index 3287c87215..10e7252730 100644 --- a/crates/sql/src/parser.rs +++ b/crates/sql/src/parser.rs @@ -1,7 +1,7 @@ use std::collections::VecDeque; use std::fmt; -use datafusion_sql::parser::{DFParser, DescribeTableStmt, Statement as DFStatement}; +use datafusion_sql::parser::{DFParser, Statement as DFStatement}; use datafusion_sql::sqlparser::ast::{ObjectName, Value}; use datafusion_sql::sqlparser::dialect::{keywords::Keyword, Dialect, GenericDialect}; use datafusion_sql::sqlparser::parser::{Parser, ParserError}; @@ -138,10 +138,6 @@ impl<'a> DeltaParser<'a> { match self.parser.peek_token().token { Token::Word(w) => { match w.keyword { - Keyword::DESCRIBE => { - self.parser.next_token(); - self.parse_describe() - } Keyword::VACUUM => { self.parser.next_token(); self.parse_vacuum() @@ -167,50 +163,6 @@ impl<'a> DeltaParser<'a> { } } - /// Parse a SQL `DESCRIBE` statement - pub fn parse_describe(&mut self) -> Result { - match self.parser.peek_token().token { - Token::Word(w) => match w.keyword { - Keyword::DETAIL => { - self.parser.next_token(); - let table = self.parser.parse_object_name()?; - Ok(Statement::Describe(DescribeStatement { - table, - operation: DescribeOperation::Detail, - })) - } - Keyword::HISTORY => { - self.parser.next_token(); - let table = self.parser.parse_object_name()?; - Ok(Statement::Describe(DescribeStatement { - table, - operation: DescribeOperation::History, - })) - } - Keyword::FILES => { - self.parser.next_token(); - let table = self.parser.parse_object_name()?; - Ok(Statement::Describe(DescribeStatement { - table, - operation: DescribeOperation::Files, - })) - } - _ => { - let table = self.parser.parse_object_name()?; - Ok(Statement::Datafusion(DFStatement::DescribeTableStmt( - DescribeTableStmt { table_name: table }, - ))) - } - }, - _ => { - let table_name = self.parser.parse_object_name()?; - Ok(Statement::Datafusion(DFStatement::DescribeTableStmt( - DescribeTableStmt { table_name }, - ))) - } - } - } - pub fn parse_vacuum(&mut self) -> Result { let table_name = self.parser.parse_object_name()?; match self.parser.peek_token().token { @@ -287,44 +239,6 @@ mod tests { Ok(()) } - #[test] - fn test_parse_describe() { - let stmt = Statement::Describe(DescribeStatement { - table: ObjectName(vec![Ident { - value: "data_table".to_string(), - quote_style: None, - }]), - operation: DescribeOperation::History, - }); - assert!(expect_parse_ok("DESCRIBE HISTORY data_table", stmt).is_ok()); - - let stmt = Statement::Describe(DescribeStatement { - table: ObjectName(vec![Ident { - value: "data_table".to_string(), - quote_style: None, - }]), - operation: DescribeOperation::Detail, - }); - assert!(expect_parse_ok("DESCRIBE DETAIL data_table", stmt).is_ok()); - - let stmt = Statement::Describe(DescribeStatement { - table: ObjectName(vec![Ident { - value: "data_table".to_string(), - quote_style: None, - }]), - operation: DescribeOperation::Files, - }); - assert!(expect_parse_ok("DESCRIBE FILES data_table", stmt).is_ok()); - - let stmt = Statement::Datafusion(DFStatement::DescribeTableStmt(DescribeTableStmt { - table_name: ObjectName(vec![Ident { - value: "data_table".to_string(), - quote_style: None, - }]), - })); - assert!(expect_parse_ok("DESCRIBE data_table", stmt).is_ok()) - } - #[test] fn test_parse_vacuum() { let stmt = Statement::Vacuum(VacuumStatement {