Skip to content

Commit

Permalink
chore: upgrade to DataFusion 35.0 (#2121)
Browse files Browse the repository at this point in the history
# Description
This PR upgrades `delta-rs` to using DataFusion 35.0, which was recently
released. In order to do this, I had to fix a few breaking changes, and
also upgrade Arrow to 50 and `sqlparser` to 0.41.

# Related Issue(s)
N/A

# Documentation
See here for the list of PRs which required code change:
- apache/datafusion#8703
-
https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/dev/changelog/35.0.0.md?plain=1#L227

---------

Co-authored-by: Ming Ying <[email protected]>
  • Loading branch information
philippemnoel and rebasedming authored Jan 30, 2024
1 parent 8f938ed commit 3406813
Show file tree
Hide file tree
Showing 14 changed files with 80 additions and 144 deletions.
39 changes: 19 additions & 20 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,27 @@ debug = "line-tables-only"

[workspace.dependencies]
# arrow
arrow = { version = "49" }
arrow-arith = { version = "49" }
arrow-array = { version = "49" }
arrow-buffer = { version = "49" }
arrow-cast = { version = "49" }
arrow-ipc = { version = "49" }
arrow-json = { version = "49" }
arrow-ord = { version = "49" }
arrow-row = { version = "49" }
arrow-schema = { version = "49" }
arrow-select = { version = "49" }
object_store = { version = "0.8" }
parquet = { version = "49" }
arrow = { version = "50" }
arrow-arith = { version = "50" }
arrow-array = { version = "50" }
arrow-buffer = { version = "50" }
arrow-cast = { version = "50" }
arrow-ipc = { version = "50" }
arrow-json = { version = "50" }
arrow-ord = { version = "50" }
arrow-row = { version = "50" }
arrow-schema = { version = "50" }
arrow-select = { version = "50" }
object_store = { version = "0.9" }
parquet = { version = "50" }

# datafusion
datafusion = { version = "34" }
datafusion-expr = { version = "34" }
datafusion-common = { version = "34" }
datafusion-proto = { version = "34" }
datafusion-sql = { version = "34" }
datafusion-physical-expr = { version = "34" }

datafusion = { version = "35" }
datafusion-expr = { version = "35" }
datafusion-common = { version = "35" }
datafusion-proto = { version = "35" }
datafusion-sql = { version = "35" }
datafusion-physical-expr = { version = "35" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand Down
1 change: 1 addition & 0 deletions crates/azure/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ enum AzureCredential {
/// Authorizing with secret
ClientSecret,
/// Using a shared access signature
#[allow(dead_code)]
ManagedIdentity,
/// Using a shared access signature
SasKey,
Expand Down
1 change: 1 addition & 0 deletions crates/azure/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(thiserror::Error, Debug)]
pub(crate) enum Error {
#[allow(dead_code)]
#[error("failed to parse config: {0}")]
Parse(String),

Expand Down
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [
"rustls-tls",
"json",
], optional = true }
sqlparser = { version = "0.40", optional = true }
sqlparser = { version = "0.41", optional = true }

[dev-dependencies]
criterion = "0.5"
Expand Down
8 changes: 3 additions & 5 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub mod physical;
impl From<DeltaTableError> for DataFusionError {
fn from(err: DeltaTableError) -> Self {
match err {
DeltaTableError::Arrow { source } => DataFusionError::ArrowError(source),
DeltaTableError::Arrow { source } => DataFusionError::ArrowError(source, None),
DeltaTableError::Io { source } => DataFusionError::IoError(source),
DeltaTableError::ObjectStore { source } => DataFusionError::ObjectStore(source),
DeltaTableError::Parquet { source } => DataFusionError::ParquetError(source),
Expand All @@ -102,7 +102,7 @@ impl From<DeltaTableError> for DataFusionError {
impl From<DataFusionError> for DeltaTableError {
fn from(err: DataFusionError) -> Self {
match err {
DataFusionError::ArrowError(source) => DeltaTableError::Arrow { source },
DataFusionError::ArrowError(source, _) => DeltaTableError::Arrow { source },
DataFusionError::IoError(source) => DeltaTableError::Io { source },
DataFusionError::ObjectStore(source) => DeltaTableError::ObjectStore { source },
DataFusionError::ParquetError(source) => DeltaTableError::Parquet { source },
Expand Down Expand Up @@ -430,7 +430,6 @@ impl<'a> DeltaScanBuilder<'a> {
limit: self.limit,
table_partition_cols,
output_ordering: vec![],
infinite_source: false,
},
logical_filter.as_ref(),
)
Expand Down Expand Up @@ -808,7 +807,7 @@ pub(crate) fn logical_expr_to_physical_expr(
) -> Arc<dyn PhysicalExpr> {
let df_schema = schema.clone().to_dfschema().unwrap();
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap()
create_physical_expr(expr, &df_schema, &execution_props).unwrap()
}

pub(crate) async fn execute_plan_to_batch(
Expand Down Expand Up @@ -1238,7 +1237,6 @@ pub(crate) async fn find_files_scan<'a>(
let predicate_expr = create_physical_expr(
&Expr::IsTrue(Box::new(expression.clone())),
&input_dfschema,
&input_schema,
state.execution_props(),
)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl LogicalFile<'_> {
self.deletion_vector.as_ref().and_then(|arr| {
arr.storage_type
.is_valid(self.index)
.then(|| DeletionVectorView {
.then_some(DeletionVectorView {
data: arr,
index: self.index,
})
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ async fn excute_non_empty_expr(
let predicate_expr = create_physical_expr(
&negated_expression,
&input_dfschema,
&input_schema,
state.execution_props(),
)?;
let filter: Arc<dyn ExecutionPlan> =
Expand Down
4 changes: 3 additions & 1 deletion crates/core/src/operations/merge/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@ impl Stream for MergeBarrierStream {
.iter()
.map(|c| {
arrow::compute::take(c.as_ref(), &indices, None)
.map_err(DataFusionError::ArrowError)
.map_err(|err| {
DataFusionError::ArrowError(err, None)
})
})
.collect::<DataFusionResult<Vec<ArrayRef>>>()?;

Expand Down
6 changes: 1 addition & 5 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

use arrow_schema::Schema as ArrowSchema;
use async_trait::async_trait;
use datafusion::datasource::provider_as_source;
use datafusion::error::Result as DataFusionResult;
Expand Down Expand Up @@ -657,11 +656,10 @@ impl ExtensionPlanner for MergeMetricExtensionPlanner {

if let Some(barrier) = node.as_any().downcast_ref::<MergeBarrier>() {
let schema = barrier.input.schema();
let exec_schema: ArrowSchema = schema.as_ref().to_owned().into();
return Ok(Some(Arc::new(MergeBarrierExec::new(
physical_inputs.first().unwrap().clone(),
barrier.file_column.clone(),
planner.create_physical_expr(&barrier.expr, schema, &exec_schema, session_state)?,
planner.create_physical_expr(&barrier.expr, schema, session_state)?,
))));
}

Expand Down Expand Up @@ -1418,9 +1416,7 @@ impl std::future::IntoFuture for MergeBuilder {
PROTOCOL.can_write_to(&this.snapshot)?;

let state = this.state.unwrap_or_else(|| {
//TODO: Datafusion's Hashjoin has some memory issues. Running with all cores results in a OoM. Can be removed when upstream improvemetns are made.
let config: SessionConfig = DeltaSessionConfig::default().into();
let config = config.with_target_partitions(1);
let session = SessionContext::new_with_config(config);

// If a user provides their own their DF state then they must register the store themselves
Expand Down
46 changes: 32 additions & 14 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ impl MergePlan {
use datafusion::prelude::{col, ParquetReadOptions};
use datafusion_common::Column;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::Expr;
use datafusion_expr::{Expr, ScalarUDF};

let locations = files
.iter()
Expand All @@ -578,7 +578,7 @@ impl MergePlan {
.map(|col| Expr::Column(Column::from_qualified_name_ignore_case(col)))
.collect_vec();
let expr = Expr::ScalarFunction(ScalarFunction::new_udf(
Arc::new(zorder::datafusion::zorder_key_udf()),
Arc::new(ScalarUDF::from(zorder::datafusion::ZOrderUDF)),
cols,
));
let df = df.with_column(ZORDER_KEY_COLUMN, expr)?;
Expand Down Expand Up @@ -1139,10 +1139,10 @@ pub(super) mod zorder {
use arrow_schema::DataType;
use datafusion_common::DataFusionError;
use datafusion_expr::{
ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature,
TypeSignature, Volatility,
ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, Volatility,
};
use itertools::Itertools;
use std::any::Any;

pub const ZORDER_UDF_NAME: &str = "zorder_key";

Expand All @@ -1166,20 +1166,38 @@ pub(super) mod zorder {

use url::Url;
let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime);
ctx.register_udf(datafusion::zorder_key_udf());
ctx.register_udf(ScalarUDF::from(datafusion::ZOrderUDF));
Ok(Self { columns, ctx })
}
}

/// Get the DataFusion UDF struct for zorder_key
pub fn zorder_key_udf() -> ScalarUDF {
let signature = Signature {
type_signature: TypeSignature::VariadicAny,
volatility: Volatility::Immutable,
};
let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Binary)));
let fun: ScalarFunctionImplementation = Arc::new(zorder_key_datafusion);
ScalarUDF::new(ZORDER_UDF_NAME, &signature, &return_type, &fun)
// DataFusion UDF impl for zorder_key
#[derive(Debug)]
pub struct ZOrderUDF;

impl ScalarUDFImpl for ZOrderUDF {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
ZORDER_UDF_NAME
}

fn signature(&self) -> &Signature {
&Signature {
type_signature: TypeSignature::VariadicAny,
volatility: Volatility::Immutable,
}
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType, DataFusionError> {
Ok(DataType::Binary)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
zorder_key_datafusion(args)
}
}

/// Datafusion zorder UDF body
Expand Down
15 changes: 14 additions & 1 deletion crates/core/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashSet;
use std::sync::Arc;

use arrow::array::ArrayRef;
use arrow::array::{ArrayRef, BooleanArray};
use arrow::datatypes::{
DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
Expand Down Expand Up @@ -296,6 +297,12 @@ impl<'a> PruningStatistics for AddContainer<'a> {
});
ScalarValue::iter_to_array(values).ok()
}

// This function is required since DataFusion 35.0, but is implemented as a no-op
// https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550
fn contained(&self, _column: &Column, _value: &HashSet<ScalarValue>) -> Option<BooleanArray> {
None
}
}

impl PruningStatistics for DeltaTableState {
Expand Down Expand Up @@ -333,6 +340,12 @@ impl PruningStatistics for DeltaTableState {
let container = AddContainer::new(&files, partition_columns, self.arrow_schema().ok()?);
container.null_counts(column)
}

// This function is required since DataFusion 35.0, but is implemented as a no-op
// https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550
fn contained(&self, _column: &Column, _value: &HashSet<ScalarValue>) -> Option<BooleanArray> {
None
}
}

#[cfg(test)]
Expand Down
10 changes: 2 additions & 8 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,7 @@ async fn execute(

let predicate_null =
when(predicate.clone(), lit(true)).otherwise(lit(ScalarValue::Boolean(None)))?;
let predicate_expr = create_physical_expr(
&predicate_null,
&input_dfschema,
&input_schema,
execution_props,
)?;
let predicate_expr = create_physical_expr(&predicate_null, &input_dfschema, execution_props)?;
expressions.push((predicate_expr, "__delta_rs_update_predicate".to_string()));

let projection_predicate: Arc<dyn ExecutionPlan> =
Expand Down Expand Up @@ -315,8 +310,7 @@ async fn execute(
let expr = case(col("__delta_rs_update_predicate"))
.when(lit(true), expr.to_owned())
.otherwise(col(column.to_owned()))?;
let predicate_expr =
create_physical_expr(&expr, &input_dfschema, &input_schema, execution_props)?;
let predicate_expr = create_physical_expr(&expr, &input_dfschema, execution_props)?;
map.insert(column.name.clone(), expressions.len());
let c = "__delta_rs_".to_string() + &column.name;
expressions.push((predicate_expr, c.clone()));
Expand Down
1 change: 1 addition & 0 deletions crates/gcp/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(thiserror::Error, Debug)]
pub(crate) enum Error {
#[allow(dead_code)]
#[error("failed to parse config: {0}")]
Parse(String),

Expand Down
Loading

0 comments on commit 3406813

Please sign in to comment.