Skip to content

Commit

Permalink
Performance: enable array allocation reuse (ScalarFunctionArgs gets…
Browse files Browse the repository at this point in the history
… owned `ColumnReference`) (apache#13637)

* Improve documentation

* Pass owned args to ScalarFunctionArgs

* Update advanced_udf with example of reusing arrays

* clarify rationale for cloning

* clarify comments

* fix expected output
  • Loading branch information
alamb authored Dec 8, 2024
1 parent f2de2c4 commit 3ee9b3d
Show file tree
Hide file tree
Showing 27 changed files with 288 additions and 212 deletions.
126 changes: 100 additions & 26 deletions datafusion-examples/examples/advanced_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ use arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use datafusion::logical_expr::Volatility;
use datafusion::prelude::*;
use datafusion_common::{internal_err, ScalarValue};
use datafusion_common::{exec_err, internal_err, ScalarValue};
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature};
use datafusion_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
};

/// This example shows how to use the full ScalarUDFImpl API to implement a user
/// defined function. As in the `simple_udf.rs` example, this struct implements
Expand Down Expand Up @@ -83,23 +85,27 @@ impl ScalarUDFImpl for PowUdf {
Ok(DataType::Float64)
}

/// This is the function that actually calculates the results.
/// This function actually calculates the results of the scalar function.
///
/// This is the same way that functions provided with DataFusion are invoked,
/// which permits important special cases:
///
/// This is the same way that functions built into DataFusion are invoked,
/// which permits important special cases when one or both of the arguments
/// are single values (constants). For example `pow(a, 2)`
///1. When one or both of the arguments are single values (constants).
/// For example `pow(a, 2)`
/// 2. When the input arrays can be reused (avoid allocating a new output array)
///
/// However, it also means the implementation is more complex than when
/// using `create_udf`.
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
// The other fields of the `args` struct are used for more specialized
// uses, and are not needed in this example
let ScalarFunctionArgs { mut args, .. } = args;
// DataFusion has arranged for the correct inputs to be passed to this
// function, but we check again to make sure
assert_eq!(args.len(), 2);
let (base, exp) = (&args[0], &args[1]);
// take ownership of arguments by popping in reverse order
let exp = args.pop().unwrap();
let base = args.pop().unwrap();
assert_eq!(base.data_type(), DataType::Float64);
assert_eq!(exp.data_type(), DataType::Float64);

Expand All @@ -118,7 +124,7 @@ impl ScalarUDFImpl for PowUdf {
) => {
// compute the output. Note DataFusion treats `None` as NULL.
let res = match (base, exp) {
(Some(base), Some(exp)) => Some(base.powf(*exp)),
(Some(base), Some(exp)) => Some(base.powf(exp)),
// one or both arguments were NULL
_ => None,
};
Expand All @@ -140,31 +146,33 @@ impl ScalarUDFImpl for PowUdf {
// kernel creates very fast "vectorized" code and
// handles things like null values for us.
let res: Float64Array =
compute::unary(base_array, |base| base.powf(*exp));
compute::unary(base_array, |base| base.powf(exp));
Arc::new(res)
}
};
Ok(ColumnarValue::Array(result_array))
}

// special case if the base is a constant (note this code is quite
// similar to the previous case, so we omit comments)
// special case if the base is a constant.
//
// Note this case is very similar to the previous case, so we could
// use the same pattern. However, for this case we demonstrate an
// even more advanced pattern to potentially avoid allocating a new array
(
ColumnarValue::Scalar(ScalarValue::Float64(base)),
ColumnarValue::Array(exp_array),
) => {
let res = match base {
None => new_null_array(exp_array.data_type(), exp_array.len()),
Some(base) => {
let exp_array = exp_array.as_primitive::<Float64Type>();
let res: Float64Array =
compute::unary(exp_array, |exp| base.powf(exp));
Arc::new(res)
}
Some(base) => maybe_pow_in_place(base, exp_array)?,
};
Ok(ColumnarValue::Array(res))
}
// Both arguments are arrays so we have to perform the calculation for every row
// Both arguments are arrays so we have to perform the calculation
// for every row
//
// Note this could also be done in place using `binary_mut` as
// is done in `maybe_pow_in_place` but here we use binary for simplicity
(ColumnarValue::Array(base_array), ColumnarValue::Array(exp_array)) => {
let res: Float64Array = compute::binary(
base_array.as_primitive::<Float64Type>(),
Expand All @@ -191,6 +199,52 @@ impl ScalarUDFImpl for PowUdf {
}
}

/// Evaluate `base ^ exp` *without* allocating a new array, if possible
fn maybe_pow_in_place(base: f64, exp_array: ArrayRef) -> Result<ArrayRef> {
// Calling `unary` creates a new array for the results. Avoiding
// allocations is a common optimization in performance critical code.
// arrow-rs allows this optimization via the `unary_mut`
// and `binary_mut` kernels in certain cases
//
// These kernels can only be used if there are no other references to
// the arrays (exp_array has to be the last remaining reference).
let owned_array = exp_array
// as in the previous example, we first downcast to &Float64Array
.as_primitive::<Float64Type>()
// non-obviously, we call clone here to get an owned `Float64Array`.
// Calling clone() is relatively inexpensive as it increments
// some ref counts but doesn't clone the data)
//
// Once we have the owned Float64Array we can drop the original
// exp_array (untyped) reference
.clone();

// We *MUST* drop the reference to `exp_array` explicitly so that
// owned_array is the only reference remaining in this function.
//
// Note that depending on the query there may still be other references
// to the underlying buffers, which would prevent reuse. The only way to
// know for sure is the result of `compute::unary_mut`
drop(exp_array);

// If we have the only reference, compute the result directly into the same
// allocation as was used for the input array
match compute::unary_mut(owned_array, |exp| base.powf(exp)) {
Err(_orig_array) => {
// unary_mut will return the original array if there are other
// references into the underling buffer (and thus reuse is
// impossible)
//
// In a real implementation, this case should fall back to
// calling `unary` and allocate a new array; In this example
// we will return an error for demonstration purposes
exec_err!("Could not reuse array for maybe_pow_in_place")
}
// a result of OK means the operation was run successfully
Ok(res) => Ok(Arc::new(res)),
}
}

/// In this example we register `PowUdf` as a user defined function
/// and invoke it via the DataFrame API and SQL
#[tokio::main]
Expand All @@ -215,9 +269,29 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;

// You can also invoke both pow(2, 10) and its alias my_pow(a, b) using SQL
let sql_df = ctx.sql("SELECT pow(2, 10), my_pow(a, b) FROM t").await?;
sql_df.show().await?;
// You can also invoke both pow(2, 10) and its alias my_pow(a, b) using SQL
ctx.sql("SELECT pow(2, 10), my_pow(a, b) FROM t")
.await?
.show()
.await?;

// You can also invoke pow_in_place by passing a constant base and a
// column `a` as the exponent . If there is only a single
// reference to `a` the code works well
ctx.sql("SELECT pow(2, a) FROM t").await?.show().await?;

// However, if there are multiple references to `a` in the evaluation
// the array storage can not be reused
let err = ctx
.sql("SELECT pow(2, a), pow(3, a) FROM t")
.await?
.show()
.await
.unwrap_err();
assert_eq!(
err.to_string(),
"Execution error: Could not reuse array for maybe_pow_in_place"
);

Ok(())
}
Expand Down
14 changes: 8 additions & 6 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,13 +326,15 @@ where
}
}

/// Arguments passed to [`ScalarUDFImpl::invoke_with_args`] when invoking a
/// scalar function.
pub struct ScalarFunctionArgs<'a> {
// The evaluated arguments to the function
pub args: &'a [ColumnarValue],
// The number of rows in record batch being evaluated
/// The evaluated arguments to the function
pub args: Vec<ColumnarValue>,
/// The number of rows in record batch being evaluated
pub number_rows: usize,
// The return type of the scalar function returned (from `return_type` or `return_type_from_exprs`)
// when creating the physical expression from the logical expression
/// The return type of the scalar function returned (from `return_type` or `return_type_from_exprs`)
/// when creating the physical expression from the logical expression
pub return_type: &'a DataType,
}

Expand Down Expand Up @@ -539,7 +541,7 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
/// [`ColumnarValue::values_to_arrays`] can be used to convert the arguments
/// to arrays, which will likely be simpler code, but be slower.
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
self.invoke_batch(args.args, args.number_rows)
self.invoke_batch(&args.args, args.number_rows)
}

/// Invoke the function without `args`, instead the number of rows are provided,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions/src/datetime/to_local_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ mod tests {
fn test_to_local_time_helper(input: ScalarValue, expected: ScalarValue) {
let res = ToLocalTimeFunc::new()
.invoke_with_args(ScalarFunctionArgs {
args: &[ColumnarValue::Scalar(input)],
args: vec![ColumnarValue::Scalar(input)],
number_rows: 1,
return_type: &expected.data_type(),
})
Expand Down
6 changes: 3 additions & 3 deletions datafusion/functions/src/string/ascii.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ mod tests {
($INPUT:expr, $EXPECTED:expr) => {
test_function!(
AsciiFunc::new(),
&[ColumnarValue::Scalar(ScalarValue::Utf8($INPUT))],
vec![ColumnarValue::Scalar(ScalarValue::Utf8($INPUT))],
$EXPECTED,
i32,
Int32,
Expand All @@ -166,7 +166,7 @@ mod tests {

test_function!(
AsciiFunc::new(),
&[ColumnarValue::Scalar(ScalarValue::LargeUtf8($INPUT))],
vec![ColumnarValue::Scalar(ScalarValue::LargeUtf8($INPUT))],
$EXPECTED,
i32,
Int32,
Expand All @@ -175,7 +175,7 @@ mod tests {

test_function!(
AsciiFunc::new(),
&[ColumnarValue::Scalar(ScalarValue::Utf8View($INPUT))],
vec![ColumnarValue::Scalar(ScalarValue::Utf8View($INPUT))],
$EXPECTED,
i32,
Int32,
Expand Down
24 changes: 12 additions & 12 deletions datafusion/functions/src/string/btrim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,17 +152,17 @@ mod tests {
// String view cases for checking normal logic
test_function!(
BTrimFunc::new(),
&[ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
vec![ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
String::from("alphabet ")
))),],
)))],
Ok(Some("alphabet")),
&str,
Utf8View,
StringViewArray
);
test_function!(
BTrimFunc::new(),
&[ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
vec![ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
String::from(" alphabet ")
))),],
Ok(Some("alphabet")),
Expand All @@ -172,7 +172,7 @@ mod tests {
);
test_function!(
BTrimFunc::new(),
&[
vec![
ColumnarValue::Scalar(ScalarValue::Utf8View(Some(String::from(
"alphabet"
)))),
Expand All @@ -185,7 +185,7 @@ mod tests {
);
test_function!(
BTrimFunc::new(),
&[
vec![
ColumnarValue::Scalar(ScalarValue::Utf8View(Some(String::from(
"alphabet"
)))),
Expand All @@ -200,7 +200,7 @@ mod tests {
);
test_function!(
BTrimFunc::new(),
&[
vec![
ColumnarValue::Scalar(ScalarValue::Utf8View(Some(String::from(
"alphabet"
)))),
Expand All @@ -214,7 +214,7 @@ mod tests {
// Special string view case for checking unlined output(len > 12)
test_function!(
BTrimFunc::new(),
&[
vec![
ColumnarValue::Scalar(ScalarValue::Utf8View(Some(String::from(
"xxxalphabetalphabetxxx"
)))),
Expand All @@ -228,7 +228,7 @@ mod tests {
// String cases
test_function!(
BTrimFunc::new(),
&[ColumnarValue::Scalar(ScalarValue::Utf8(Some(
vec![ColumnarValue::Scalar(ScalarValue::Utf8(Some(
String::from("alphabet ")
))),],
Ok(Some("alphabet")),
Expand All @@ -238,7 +238,7 @@ mod tests {
);
test_function!(
BTrimFunc::new(),
&[ColumnarValue::Scalar(ScalarValue::Utf8(Some(
vec![ColumnarValue::Scalar(ScalarValue::Utf8(Some(
String::from("alphabet ")
))),],
Ok(Some("alphabet")),
Expand All @@ -248,7 +248,7 @@ mod tests {
);
test_function!(
BTrimFunc::new(),
&[
vec![
ColumnarValue::Scalar(ScalarValue::Utf8(Some(String::from("alphabet")))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some(String::from("t")))),
],
Expand All @@ -259,7 +259,7 @@ mod tests {
);
test_function!(
BTrimFunc::new(),
&[
vec![
ColumnarValue::Scalar(ScalarValue::Utf8(Some(String::from("alphabet")))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some(String::from("alphabe")))),
],
Expand All @@ -270,7 +270,7 @@ mod tests {
);
test_function!(
BTrimFunc::new(),
&[
vec![
ColumnarValue::Scalar(ScalarValue::Utf8(Some(String::from("alphabet")))),
ColumnarValue::Scalar(ScalarValue::Utf8(None)),
],
Expand Down
Loading

0 comments on commit 3ee9b3d

Please sign in to comment.