Skip to content

Commit

Permalink
refactor: switch BooleanBufferBuilder to NullBufferBuilder in correla…
Browse files Browse the repository at this point in the history
…tion function
  • Loading branch information
Cheng-Yuan-Lai committed Jan 18, 2025
1 parent 0c229d7 commit b6ae33a
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 10 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/functions-aggregate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ path = "src/lib.rs"
[dependencies]
ahash = { workspace = true }
arrow = { workspace = true }
arrow-buffer = { workspace = true }
arrow-schema = { workspace = true }
datafusion-common = { workspace = true }
datafusion-doc = { workspace = true }
Expand Down
17 changes: 7 additions & 10 deletions datafusion/functions-aggregate/src/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ use std::mem::size_of_val;
use std::sync::Arc;

use arrow::array::{
downcast_array, Array, AsArray, BooleanArray, BooleanBufferBuilder, Float64Array,
UInt64Array,
downcast_array, Array, AsArray, BooleanArray, Float64Array, UInt64Array,
};
use arrow::compute::{and, filter, is_not_null, kernels::cast};
use arrow::datatypes::{Float64Type, UInt64Type};
use arrow::{
array::ArrayRef,
datatypes::{DataType, Field},
};
use arrow_buffer::NullBufferBuilder;
use datafusion_expr::{EmitTo, GroupsAccumulator};
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::accumulate_multiple;
use log::debug;
Expand Down Expand Up @@ -451,7 +451,7 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator {
};

let mut values = Vec::with_capacity(n);
let mut nulls = BooleanBufferBuilder::new(n);
let mut nulls = NullBufferBuilder::new(n);

// Notes for `Null` handling:
// - If the `count` state of a group is 0, no valid records are accumulated
Expand All @@ -466,7 +466,7 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator {
if self.count[i] < 2 {
// TODO: Evaluate as `Null` (see notes above)
values.push(0.0);
nulls.append(false);
nulls.append_null();
continue;
}

Expand All @@ -487,17 +487,14 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator {
if denominator == 0.0 {
// TODO: Evaluate as `Null` (see notes above)
values.push(0.0);
nulls.append(false);
nulls.append_null();
} else {
values.push(numerator / denominator);
nulls.append(true);
nulls.append_non_null();
}
}

Ok(Arc::new(Float64Array::new(
values.into(),
Some(nulls.finish().into()),
)))
Ok(Arc::new(Float64Array::new(values.into(), nulls.finish())))
}

fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
Expand Down

0 comments on commit b6ae33a

Please sign in to comment.