Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: api changes for Datafusion 44 #3073

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,17 @@ async-trait = { version = "0.1" }
futures = { version = "0.3" }
tokio = { version = "1" }
num_cpus = { version = "1" }

# temporary datafusion patches
[patch.crates-io]
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" }
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" }
datafusion-execution = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" }
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" }
datafusion-ffi = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" }
datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" }
datafusion-functions-aggregate = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" }
datafusion-proto = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" }
datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" }
3 changes: 0 additions & 3 deletions crates/catalog-unity/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub enum UnityCatalogError {
},

/// A generic error qualified in the message

#[error("{source}")]
Retry {
/// Error message
Expand All @@ -19,7 +18,6 @@ pub enum UnityCatalogError {
},

#[error("Request error: {source}")]

/// Error from reqwest library
RequestError {
/// The underlying reqwest_middleware::Error
Expand All @@ -35,7 +33,6 @@ pub enum UnityCatalogError {
},

/// Error caused by invalid access token value

#[error("Invalid Databricks personal access token")]
InvalidAccessToken,
}
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ tracing = { workspace = true }
rand = "0.8"
z85 = "3.0.5"
maplit = "1"
sqlparser = { version = "0.52.0" }
sqlparser = { version = "0.53.0" }

[dev-dependencies]
criterion = "0.5"
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/delta_datafusion/cdf/scan_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub fn create_partition_values<F: FileAction>(
extensions: None,
range: None,
statistics: None,
metadata_size_hint: None,
};

file_groups.entry(new_part_values).or_default().push(part);
Expand Down
9 changes: 2 additions & 7 deletions crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,13 @@ impl ScalarUDFImpl for MakeParquetArray {
r_type
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(&self, args: &[ColumnarValue], number_rows: usize) -> Result<ColumnarValue> {
let mut data_type = DataType::Null;
for arg in args {
data_type = arg.data_type();
}

#[allow(deprecated)]
match self.actual.invoke(args)? {
match self.actual.invoke_batch(args, number_rows)? {
ColumnarValue::Scalar(ScalarValue::List(df_array)) => {
let field = Arc::new(Field::new("element", data_type, true));
let result = Ok(ColumnarValue::Scalar(ScalarValue::List(Arc::new(
Expand All @@ -127,10 +126,6 @@ impl ScalarUDFImpl for MakeParquetArray {
}
}

fn invoke_no_args(&self, number_rows: usize) -> Result<ColumnarValue> {
self.actual.invoke_batch(&[], number_rows)
}

fn aliases(&self) -> &[String] {
&self.aliases
}
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,7 @@ fn partitioned_file_from_action(
range: None,
extensions: None,
statistics: None,
metadata_size_hint: None,
}
}

Expand Down Expand Up @@ -1959,6 +1960,7 @@ mod tests {
range: None,
extensions: None,
statistics: None,
metadata_size_hint: None,
};
assert_eq!(file.partition_values, ref_file.partition_values)
}
Expand Down
10 changes: 4 additions & 6 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1215,10 +1215,7 @@ pub(super) mod zorder {
use url::Url;

use ::datafusion::{
execution::{
memory_pool::FairSpillPool,
runtime_env::{RuntimeConfig, RuntimeEnv},
},
execution::{memory_pool::FairSpillPool, runtime_env::RuntimeEnvBuilder},
prelude::{SessionConfig, SessionContext},
};
use arrow_schema::DataType;
Expand All @@ -1245,8 +1242,9 @@ pub(super) mod zorder {
let columns = columns.into();

let memory_pool = FairSpillPool::new(max_spill_size);
let config = RuntimeConfig::new().with_memory_pool(Arc::new(memory_pool));
let runtime = Arc::new(RuntimeEnv::try_new(config)?);
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(memory_pool))
.build_arc()?;
runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store);

let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime);
Expand Down
12 changes: 8 additions & 4 deletions crates/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use datafusion_sql::parser::{DFParser, Statement as DFStatement};
use datafusion_sql::sqlparser::ast::{ObjectName, Value};
use datafusion_sql::sqlparser::dialect::{keywords::Keyword, Dialect, GenericDialect};
use datafusion_sql::sqlparser::parser::{Parser, ParserError};
use datafusion_sql::sqlparser::tokenizer::{Token, TokenWithLocation, Tokenizer};
use datafusion_sql::sqlparser::tokenizer::{Token, TokenWithSpan, Tokenizer};

// Use `Parser::expected` instead, if possible
macro_rules! parser_err {
Expand Down Expand Up @@ -129,7 +129,7 @@ impl<'a> DeltaParser<'a> {
}

/// Report an unexpected token
fn expected<T>(&self, expected: &str, found: TokenWithLocation) -> Result<T, ParserError> {
fn expected<T>(&self, expected: &str, found: TokenWithSpan) -> Result<T, ParserError> {
parser_err!(format!("Expected {expected}, found: {found}"))
}

Expand Down Expand Up @@ -224,9 +224,9 @@ impl<'a> DeltaParser<'a> {

#[cfg(test)]
mod tests {
use datafusion_sql::sqlparser::ast::Ident;

use super::*;
use datafusion_sql::sqlparser::ast::Ident;
use datafusion_sql::sqlparser::tokenizer::Span;

fn expect_parse_ok(sql: &str, expected: Statement) -> Result<(), ParserError> {
let statements = DeltaParser::parse_sql(sql)?;
Expand All @@ -245,6 +245,7 @@ mod tests {
table: ObjectName(vec![Ident {
value: "data_table".to_string(),
quote_style: None,
span: Span::empty(),
}]),
retention_hours: None,
dry_run: false,
Expand All @@ -255,6 +256,7 @@ mod tests {
table: ObjectName(vec![Ident {
value: "data_table".to_string(),
quote_style: None,
span: Span::empty(),
}]),
retention_hours: Some(10),
dry_run: false,
Expand All @@ -265,6 +267,7 @@ mod tests {
table: ObjectName(vec![Ident {
value: "data_table".to_string(),
quote_style: None,
span: Span::empty(),
}]),
retention_hours: Some(10),
dry_run: true,
Expand All @@ -275,6 +278,7 @@ mod tests {
table: ObjectName(vec![Ident {
value: "data_table".to_string(),
quote_style: None,
span: Span::empty(),
}]),
retention_hours: None,
dry_run: true,
Expand Down
10 changes: 1 addition & 9 deletions crates/test/src/datafusion.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
use deltalake_core::datafusion::execution::context::SessionContext;
use deltalake_core::datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use deltalake_core::datafusion::execution::session_state::SessionStateBuilder;
use deltalake_core::datafusion::prelude::SessionConfig;
use deltalake_core::delta_datafusion::DeltaTableFactory;
use std::sync::Arc;

pub fn context_with_delta_table_factory() -> SessionContext {
let cfg = RuntimeConfig::new();
let env = RuntimeEnv::try_new(cfg).unwrap();
let ses = SessionConfig::new();
let mut state = SessionStateBuilder::new()
.with_config(ses)
.with_runtime_env(Arc::new(env))
.build();
let mut state = SessionStateBuilder::new().build();
state
.table_factories_mut()
.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {}));
Expand Down
Loading