Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patched DataFusion, Oct 30, 2024 #48

Closed
wants to merge 10 commits into from
7 changes: 6 additions & 1 deletion benchmarks/src/bin/external_aggr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,12 @@ impl ExternalAggrConfig {
) -> Result<Vec<QueryResult>> {
let query_name =
format!("Q{query_id}({})", human_readable_size(mem_limit as usize));
let config = self.common.config();
let mut config = self.common.config();
config
.options_mut()
.execution
.parquet
.schema_force_view_types = self.common.force_view_types;
let runtime_config = RuntimeConfig::new()
.with_memory_pool(Arc::new(FairSpillPool::new(mem_limit as usize)))
.build_arc()?;
Expand Down
1 change: 1 addition & 0 deletions benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl RunOpt {
let mut config = self.common.config();
{
let parquet_options = &mut config.options_mut().execution.parquet;
parquet_options.schema_force_view_types = self.common.force_view_types;
// The hits_partitioned dataset specifies string columns
// as binary due to how it was written. Force it to strings
parquet_options.binary_as_string = true;
Expand Down
8 changes: 7 additions & 1 deletion benchmarks/src/imdb/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,11 @@ impl RunOpt {
.config()
.with_collect_statistics(!self.disable_statistics);
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;

config
.options_mut()
.execution
.parquet
.schema_force_view_types = self.common.force_view_types;
let ctx = SessionContext::new_with_config(config);

// register tables
Expand Down Expand Up @@ -513,6 +517,7 @@ mod tests {
partitions: Some(2),
batch_size: 8192,
debug: false,
force_view_types: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down Expand Up @@ -546,6 +551,7 @@ mod tests {
partitions: Some(2),
batch_size: 8192,
debug: false,
force_view_types: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down
7 changes: 7 additions & 0 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ impl RunOpt {
.config()
.with_collect_statistics(!self.disable_statistics);
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
config
.options_mut()
.execution
.parquet
.schema_force_view_types = self.common.force_view_types;
let ctx = SessionContext::new_with_config(config);

// register tables
Expand Down Expand Up @@ -340,6 +345,7 @@ mod tests {
partitions: Some(2),
batch_size: 8192,
debug: false,
force_view_types: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down Expand Up @@ -373,6 +379,7 @@ mod tests {
partitions: Some(2),
batch_size: 8192,
debug: false,
force_view_types: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down
5 changes: 5 additions & 0 deletions benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ pub struct CommonOpt {
/// Activate debug mode to see more details
#[structopt(short, long)]
pub debug: bool,

/// If true, will use StringView/BinaryViewArray instead of String/BinaryArray
/// when reading ParquetFiles
#[structopt(long)]
pub force_view_types: bool,
}

impl CommonOpt {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.

// Disable clippy lints that were introduced after this code was written
#![allow(clippy::needless_lifetimes)]
#![allow(clippy::unnecessary_lazy_evaluations)]
#![allow(clippy::empty_line_after_doc_comments)]

mod catalog;
mod dynamic_file;
mod schema;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ config_namespace! {
///
/// This is used to workaround bugs in the planner that are now caught by
/// the new schema verification step.
pub skip_physical_aggregate_schema_check: bool, default = false
pub skip_physical_aggregate_schema_check: bool, default = true

/// Specifies the reserved memory for each spillable sort operation to
/// facilitate an in-memory merge.
Expand Down Expand Up @@ -399,7 +399,7 @@ config_namespace! {

/// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
/// and `Binary/BinaryLarge` with `BinaryView`.
pub schema_force_view_types: bool, default = true
pub schema_force_view_types: bool, default = false

/// (reading) If true, parquet reader will read columns of
/// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
// under the License.
// Make cheap clones clear: https://github.com/apache/datafusion/issues/11143
#![deny(clippy::clone_on_ref_ptr)]
// Disable clippy lints that were introduced after this code was written
#![allow(clippy::needless_lifetimes)]
#![allow(clippy::unnecessary_lazy_evaluations)]
#![allow(clippy::empty_line_after_doc_comments)]

mod column;
mod dfschema;
Expand Down
5 changes: 0 additions & 5 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -980,11 +980,6 @@ impl ScalarValue {
ScalarValue::from(val.into())
}

/// Returns a [`ScalarValue::Utf8View`] representing `val`
pub fn new_utf8view(val: impl Into<String>) -> Self {
ScalarValue::Utf8View(Some(val.into()))
}

/// Returns a [`ScalarValue::IntervalYearMonth`] representing
/// `years` years and `months` months
pub fn new_interval_ym(years: i32, months: i32) -> Self {
Expand Down
20 changes: 20 additions & 0 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,26 @@ impl Statistics {
self
}

/// Project the statistics to the given column indices.
///
/// For example, if we had statistics for columns `{"a", "b", "c"}`,
/// projecting to `vec![2, 1]` would return statistics for columns `{"c",
/// "b"}`.
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
let Some(projection) = projection else {
return self;
};

// todo: it would be nice to avoid cloning column statistics if
// possible (e.g. if the projection did not contain duplicates)
self.column_statistics = projection
.iter()
.map(|&i| self.column_statistics[i].clone())
.collect();

self
}

/// Calculates the statistics after `fetch` and `skip` operations apply.
/// Here, `self` denotes per-partition statistics. Use the `n_partitions`
/// parameter to compute global statistics in a multi-partition setting.
Expand Down
8 changes: 8 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
// specific language governing permissions and limitations
// under the License.
#![warn(missing_docs, clippy::needless_borrow)]
// Disable clippy lints that were introduced after this code was written
#![allow(clippy::needless_return)]
#![allow(clippy::needless_lifetimes)]
#![allow(clippy::unnecessary_lazy_evaluations)]
#![allow(clippy::empty_line_after_doc_comments)]
#![allow(clippy::unnecessary_filter_map)]
#![allow(clippy::manual_div_ceil)]
#![allow(missing_docs)]

//! [DataFusion] is an extensible query engine written in Rust that
//! uses [Apache Arrow] as its in-memory format. DataFusion's target users are
Expand Down
10 changes: 10 additions & 0 deletions datafusion/core/src/physical_optimizer/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};

use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::union::UnionExec;
use itertools::izip;

/// The SanityCheckPlan rule rejects the following query plans:
Expand Down Expand Up @@ -126,6 +128,14 @@ pub fn check_plan_sanity(
plan.required_input_ordering().iter(),
plan.required_input_distribution().iter()
) {
// TEMP HACK WORKAROUND https://github.com/apache/datafusion/issues/11492
if child.as_any().downcast_ref::<UnionExec>().is_some() {
continue;
}
if child.as_any().downcast_ref::<SortExec>().is_some() {
continue;
}

let child_eq_props = child.equivalence_properties();
if let Some(sort_req) = sort_req {
if !child_eq_props.ordering_satisfy_requirement(sort_req) {
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ impl DefaultPhysicalPlanner {
if &physical_input_schema != physical_input_schema_from_logical
&& !options.execution.skip_physical_aggregate_schema_check
{
log::warn!("Physical input schema should be the same as the one converted from logical input schema, but did not match for logical plan:\n{}", input.display_indent());
return internal_err!("Physical input schema should be the same as the one converted from logical input schema.");
}

Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,8 @@ async fn page_index_filter_one_col() {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();

// 5.create filter date_string_col == "01/01/09"`;
// Note this test doesn't apply type coercion so the literal must match the actual view type
let filter = col("date_string_col").eq(lit(ScalarValue::new_utf8view("01/01/09")));
// 5.create filter date_string_col == 1;
let filter = col("date_string_col").eq(lit("01/01/09"));
let parquet_exec = get_parquet_exec(&state, filter).await;
let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
let batch = results.next().await.unwrap().unwrap();
Expand Down
5 changes: 5 additions & 0 deletions datafusion/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
// under the License.
// Make cheap clones clear: https://github.com/apache/datafusion/issues/11143
#![deny(clippy::clone_on_ref_ptr)]
// Disable clippy lints that were introduced after this code was written
#![allow(clippy::needless_return)]
#![allow(clippy::needless_lifetimes)]
#![allow(clippy::unnecessary_lazy_evaluations)]
#![allow(clippy::empty_line_after_doc_comments)]

//! DataFusion execution configuration and runtime structures

Expand Down
6 changes: 6 additions & 0 deletions datafusion/expr-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
// specific language governing permissions and limitations
// under the License.

// Disable clippy lints that were introduced after this code was written
#![allow(clippy::needless_return)]
#![allow(clippy::needless_lifetimes)]
#![allow(clippy::unnecessary_lazy_evaluations)]
#![allow(clippy::empty_line_after_doc_comments)]

//! Logical Expr types and traits for [DataFusion]
//!
//! This crate contains types and traits that are used by both Logical and Physical expressions.
Expand Down
7 changes: 3 additions & 4 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ impl ExprSchemable for Expr {
match self {
Expr::Column(c) => Ok(schema.metadata(c)?.clone()),
Expr::Alias(Alias { expr, .. }) => expr.metadata(schema),
Expr::Cast(Cast { expr, .. }) => expr.metadata(schema),
_ => Ok(HashMap::new()),
}
}
Expand Down Expand Up @@ -681,13 +682,11 @@ mod tests {
.with_data_type(DataType::Int32)
.with_metadata(meta.clone());

// col and alias should be metadata-preserving
// col, alias, and cast should be metadata-preserving
assert_eq!(meta, expr.metadata(&schema).unwrap());
assert_eq!(meta, expr.clone().alias("bar").metadata(&schema).unwrap());

// cast should drop input metadata since the type has changed
assert_eq!(
HashMap::new(),
meta,
expr.clone()
.cast_to(&DataType::Int64, &schema)
.unwrap()
Expand Down
5 changes: 5 additions & 0 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
// under the License.
// Make cheap clones clear: https://github.com/apache/datafusion/issues/11143
#![deny(clippy::clone_on_ref_ptr)]
// Disable clippy lints that were introduced after this code was written
#![allow(clippy::needless_return)]
#![allow(clippy::needless_lifetimes)]
#![allow(clippy::unnecessary_lazy_evaluations)]
#![allow(clippy::empty_line_after_doc_comments)]

//! [DataFusion](https://github.com/apache/datafusion)
//! is an extensible query execution framework that uses
Expand Down
5 changes: 5 additions & 0 deletions datafusion/functions-aggregate-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@

// Make cheap clones clear: https://github.com/apache/datafusion/issues/11143
#![deny(clippy::clone_on_ref_ptr)]
// Disable clippy lints that were introduced after this code was written
#![allow(clippy::needless_return)]
#![allow(clippy::needless_lifetimes)]
#![allow(clippy::unnecessary_lazy_evaluations)]
#![allow(clippy::empty_line_after_doc_comments)]

pub mod accumulator;
pub mod aggregate;
Expand Down
5 changes: 5 additions & 0 deletions datafusion/functions-nested/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
// under the License.
// Make cheap clones clear: https://github.com/apache/datafusion/issues/11143
#![deny(clippy::clone_on_ref_ptr)]
// Disable clippy lints that were introduced after this code was written
#![allow(clippy::needless_return)]
#![allow(clippy::needless_lifetimes)]
#![allow(clippy::unnecessary_lazy_evaluations)]
#![allow(clippy::empty_line_after_doc_comments)]

//! Nested type Functions for [DataFusion].
//!
Expand Down
5 changes: 5 additions & 0 deletions datafusion/functions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
// under the License.
// Make cheap clones clear: https://github.com/apache/datafusion/issues/11143
#![deny(clippy::clone_on_ref_ptr)]
// Disable clippy lints that were introduced after this code was written
#![allow(clippy::needless_return)]
#![allow(clippy::needless_lifetimes)]
#![allow(clippy::unnecessary_lazy_evaluations)]
#![allow(clippy::empty_line_after_doc_comments)]

//! Function packages for [DataFusion].
//!
Expand Down
6 changes: 5 additions & 1 deletion datafusion/functions/src/string/bit_length.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ impl Default for BitLengthFunc {
impl BitLengthFunc {
pub fn new() -> Self {
Self {
signature: Signature::string(1, Volatility::Immutable),
signature: Signature::uniform(
1,
vec![DataType::Utf8, DataType::LargeUtf8],
Volatility::Immutable,
),
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
// under the License.
// Make cheap clones clear: https://github.com/apache/datafusion/issues/11143
#![deny(clippy::clone_on_ref_ptr)]
// Disable clippy lints that were introduced after this code was written
#![allow(clippy::needless_return)]
#![allow(clippy::needless_lifetimes)]
#![allow(clippy::unnecessary_lazy_evaluations)]
#![allow(clippy::empty_line_after_doc_comments)]

//! # DataFusion Optimizer
//!
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-expr-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
//!
//! [DataFusion]: <https://crates.io/crates/datafusion>

// Disable clippy lints that were introduced after this code was written
#![allow(clippy::needless_return)]
#![allow(clippy::needless_lifetimes)]
#![allow(clippy::unnecessary_lazy_evaluations)]
#![allow(clippy::empty_line_after_doc_comments)]

pub mod binary_map;
pub mod binary_view_map;
pub mod datum;
Expand Down
7 changes: 0 additions & 7 deletions datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,6 @@ impl PhysicalSortExpr {
}
}

/// Access the PhysicalSortExpr as a PhysicalExpr
impl AsRef<dyn PhysicalExpr> for PhysicalSortExpr {
fn as_ref(&self) -> &(dyn PhysicalExpr + 'static) {
self.expr.as_ref()
}
}

impl PartialEq for PhysicalSortExpr {
fn eq(&self, other: &PhysicalSortExpr) -> bool {
self.options == other.options && self.expr.eq(&other.expr)
Expand Down
Loading
Loading