Skip to content

Commit

Permalink
switch implementation of ceiling to use ScalarUDFImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
dadepo committed Apr 20, 2024
1 parent f09faff commit de4117e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 24 deletions.
52 changes: 43 additions & 9 deletions src/postgres/math_udfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,50 @@ pub fn tand(args: &[ArrayRef]) -> Result<ArrayRef> {
}

/// Nearest integer greater than or equal to argument (same as ceil).
pub fn ceiling(args: &[ArrayRef]) -> Result<ArrayRef> {
let values = datafusion::common::cast::as_float64_array(&args[0])?;
#[derive(Debug)]
pub struct Ceiling {
signature: Signature,
}

let mut float64array_builder = Float64Array::builder(args[0].len());
values
.iter()
.flatten()
.for_each(|decimal| float64array_builder.append_value(decimal.ceil()));
impl Ceiling {
pub fn new() -> Self {
Self {
signature: Signature::uniform(1, vec![Float64], Volatility::Immutable),
}
}
}

Ok(Arc::new(float64array_builder.finish()) as ArrayRef)
impl ScalarUDFImpl for Ceiling {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn name(&self) -> &str {
"ceiling"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(Float64)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let values = datafusion::common::cast::as_float64_array(&args[0])?;

let mut float64array_builder = Float64Array::builder(args[0].len());
values
.iter()
.flatten()
.for_each(|decimal| float64array_builder.append_value(decimal.ceil()));

Ok(ColumnarValue::Array(
Arc::new(float64array_builder.finish()) as ArrayRef,
))
}
}

/// Integer quotient of y/x (truncates towards zero)
Expand Down Expand Up @@ -217,7 +251,7 @@ impl ScalarUDFImpl for Div {
})?;

Ok(ColumnarValue::Array(
Arc::new(int64array_builder.finish()) as ArrayRef,
Arc::new(int64array_builder.finish()) as ArrayRef
))
}
}
Expand Down
19 changes: 4 additions & 15 deletions src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use datafusion::logical_expr::{ReturnTypeFunction, ScalarUDF, Signature, Volatil
use datafusion::physical_expr::functions::make_scalar_function;
use datafusion::prelude::SessionContext;

use crate::postgres::math_udfs::{acosd, asind, atand, ceiling, cosd, cotd, sind, tand, Erf, Erfc, RandomNormal, Div};
use crate::postgres::math_udfs::{
acosd, asind, atand, cosd, cotd, sind, tand, Ceiling, Div, Erf, Erfc, RandomNormal,
};
use crate::postgres::network_udfs::{
broadcast, family, host, hostmask, inet_merge, inet_same_family, masklen, netmask, network,
set_masklen,
Expand All @@ -32,7 +34,7 @@ fn register_math_udfs(ctx: &SessionContext) -> Result<()> {
register_sind(ctx);
register_atand(ctx);
register_tand(ctx);
register_ceiling(ctx);
ctx.register_udf(ScalarUDF::from(Ceiling::new()));
ctx.register_udf(ScalarUDF::from(Div::new()));
ctx.register_udf(ScalarUDF::from(Erf::new()));
ctx.register_udf(ScalarUDF::from(Erfc::new()));
Expand Down Expand Up @@ -131,19 +133,6 @@ fn register_tand(ctx: &SessionContext) {
ctx.register_udf(tand_udf);
}

fn register_ceiling(ctx: &SessionContext) {
let ceiling_udf = make_scalar_function(ceiling);
let return_type: ReturnTypeFunction = Arc::new(move |_| Ok(Arc::new(Float64)));
let ceiling_udf = ScalarUDF::new(
"ceiling",
&Signature::uniform(1, vec![Float64], Volatility::Immutable),
&return_type,
&ceiling_udf,
);

ctx.register_udf(ceiling_udf);
}

fn register_network_udfs(ctx: &SessionContext) -> Result<()> {
register_broadcast(ctx);
register_family(ctx);
Expand Down

0 comments on commit de4117e

Please sign in to comment.