diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 5562d98983f7a..dc74190dfcf85 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1444,6 +1444,7 @@ version = "44.0.0" dependencies = [ "ahash", "arrow", + "arrow-buffer", "arrow-schema", "datafusion-common", "datafusion-doc", diff --git a/datafusion/functions-aggregate/Cargo.toml b/datafusion/functions-aggregate/Cargo.toml index bd65490c5a78e..81e5233a1516b 100644 --- a/datafusion/functions-aggregate/Cargo.toml +++ b/datafusion/functions-aggregate/Cargo.toml @@ -40,6 +40,7 @@ path = "src/lib.rs" [dependencies] ahash = { workspace = true } arrow = { workspace = true } +arrow-buffer = { workspace = true } arrow-schema = { workspace = true } datafusion-common = { workspace = true } datafusion-doc = { workspace = true } diff --git a/datafusion/functions-aggregate/src/correlation.rs b/datafusion/functions-aggregate/src/correlation.rs index 72c1f6dbaed2b..2741fe4bfc009 100644 --- a/datafusion/functions-aggregate/src/correlation.rs +++ b/datafusion/functions-aggregate/src/correlation.rs @@ -23,8 +23,7 @@ use std::mem::size_of_val; use std::sync::Arc; use arrow::array::{ - downcast_array, Array, AsArray, BooleanArray, BooleanBufferBuilder, Float64Array, - UInt64Array, + downcast_array, Array, AsArray, BooleanArray, Float64Array, UInt64Array, }; use arrow::compute::{and, filter, is_not_null, kernels::cast}; use arrow::datatypes::{Float64Type, UInt64Type}; @@ -32,6 +31,7 @@ use arrow::{ array::ArrayRef, datatypes::{DataType, Field}, }; +use arrow_buffer::NullBufferBuilder; use datafusion_expr::{EmitTo, GroupsAccumulator}; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::accumulate_multiple; use log::debug; @@ -451,7 +451,7 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator { }; let mut values = Vec::with_capacity(n); - let mut nulls = BooleanBufferBuilder::new(n); + let mut nulls = NullBufferBuilder::new(n); // Notes for `Null` handling: // - If the `count` state of a group is 0, no valid records are accumulated @@ -466,7 +466,7 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator { if self.count[i] < 2 { // TODO: Evaluate as `Null` (see notes above) values.push(0.0); - nulls.append(false); + nulls.append_null(); continue; } @@ -487,17 +487,14 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator { if denominator == 0.0 { // TODO: Evaluate as `Null` (see notes above) values.push(0.0); - nulls.append(false); + nulls.append_null(); } else { values.push(numerator / denominator); - nulls.append(true); + nulls.append_non_null(); } } - Ok(Arc::new(Float64Array::new( - values.into(), - Some(nulls.finish().into()), - ))) + Ok(Arc::new(Float64Array::new(values.into(), nulls.finish()))) } fn state(&mut self, emit_to: EmitTo) -> Result> {