From 668b190ad5ae1ae1f9660fa4cddf68abbcc01e92 Mon Sep 17 00:00:00 2001 From: guojidan <1948535941@qq.com> Date: Tue, 9 Jan 2024 09:16:14 +0000 Subject: [PATCH 1/2] Implement monotonicity for ScalarUDF --- datafusion-examples/examples/advanced_udf.rs | 8 +++++++- datafusion/expr/src/udf.rs | 15 ++++++++++++++- datafusion/physical-expr/src/udf.rs | 2 +- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/datafusion-examples/examples/advanced_udf.rs b/datafusion-examples/examples/advanced_udf.rs index d530b9abe030a..3e7dd2e2af08f 100644 --- a/datafusion-examples/examples/advanced_udf.rs +++ b/datafusion-examples/examples/advanced_udf.rs @@ -31,7 +31,9 @@ use arrow::datatypes::Float64Type; use datafusion::error::Result; use datafusion::prelude::*; use datafusion_common::{internal_err, ScalarValue}; -use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature}; +use datafusion_expr::{ + ColumnarValue, FuncMonotonicity, ScalarUDF, ScalarUDFImpl, Signature, +}; use std::sync::Arc; /// This example shows how to use the full ScalarUDFImpl API to implement a user @@ -184,6 +186,10 @@ impl ScalarUDFImpl for PowUdf { fn aliases(&self) -> &[String] { &self.aliases } + + fn monotonicity(&self) -> Result> { + Ok(Some(vec![Some(true)])) + } } /// In this example we register `PowUdf` as a user defined function diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 8b35d5834c613..3017e1ec02716 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -18,7 +18,8 @@ //! [`ScalarUDF`]: Scalar User Defined Functions use crate::{ - ColumnarValue, Expr, ReturnTypeFunction, ScalarFunctionImplementation, Signature, + ColumnarValue, Expr, FuncMonotonicity, ReturnTypeFunction, + ScalarFunctionImplementation, Signature, }; use arrow::datatypes::DataType; use datafusion_common::Result; @@ -164,6 +165,13 @@ impl ScalarUDF { let captured = self.inner.clone(); Arc::new(move |args| captured.invoke(args)) } + + /// This function specifies monotonicity behaviors for User defined scalar functions. + /// + /// See [`ScalarUDFImpl::monotonicity`] for more details. + pub fn monotonicity(&self) -> Result> { + self.inner.monotonicity() + } } impl From for ScalarUDF @@ -271,6 +279,11 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { fn aliases(&self) -> &[String] { &[] } + + /// This function specifies monotonicity behaviors for User defined scalar functions. + fn monotonicity(&self) -> Result> { + Ok(None) + } } /// ScalarUDF that adds an alias to the underlying function. It is better to diff --git a/datafusion/physical-expr/src/udf.rs b/datafusion/physical-expr/src/udf.rs index 9daa9eb173ddf..d836b98dc5259 100644 --- a/datafusion/physical-expr/src/udf.rs +++ b/datafusion/physical-expr/src/udf.rs @@ -39,6 +39,6 @@ pub fn create_physical_expr( fun.fun(), input_phy_exprs.to_vec(), fun.return_type(&input_exprs_types)?, - None, + fun.monotonicity()?, ))) } From 39ccc78cf4ebaaf96207c58dc7a7d02b7aedaa8e Mon Sep 17 00:00:00 2001 From: guojidan <1948535941@qq.com> Date: Thu, 11 Jan 2024 10:01:52 +0000 Subject: [PATCH 2/2] add unit test case --- datafusion/physical-expr/src/udf.rs | 73 +++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/datafusion/physical-expr/src/udf.rs b/datafusion/physical-expr/src/udf.rs index d836b98dc5259..de9ba33daf293 100644 --- a/datafusion/physical-expr/src/udf.rs +++ b/datafusion/physical-expr/src/udf.rs @@ -42,3 +42,76 @@ pub fn create_physical_expr( fun.monotonicity()?, ))) } + +#[cfg(test)] +mod tests { + use arrow::datatypes::Schema; + use arrow_schema::DataType; + use datafusion_common::Result; + use datafusion_expr::{ + ColumnarValue, FuncMonotonicity, ScalarUDF, ScalarUDFImpl, Signature, Volatility, + }; + + use crate::ScalarFunctionExpr; + + use super::create_physical_expr; + + #[test] + fn test_functions() -> Result<()> { + #[derive(Debug, Clone)] + struct TestScalarUDF { + signature: Signature, + } + + impl TestScalarUDF { + fn new() -> Self { + let signature = + Signature::exact(vec![DataType::Float64], Volatility::Immutable); + + Self { signature } + } + } + + impl ScalarUDFImpl for TestScalarUDF { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn name(&self) -> &str { + "my_fn" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Float64) + } + + fn invoke(&self, _args: &[ColumnarValue]) -> Result { + unimplemented!("my_fn is not implemented") + } + + fn monotonicity(&self) -> Result> { + Ok(Some(vec![Some(true)])) + } + } + + // create and register the udf + let udf = ScalarUDF::from(TestScalarUDF::new()); + + let p_expr = create_physical_expr(&udf, &[], &Schema::empty())?; + + assert_eq!( + p_expr + .as_any() + .downcast_ref::() + .unwrap() + .monotonicity(), + &Some(vec![Some(true)]) + ); + + Ok(()) + } +}