apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.94k stars 1.13k forks source link

Create physical scalar expression in `functions` modules from string (name) #9892

Open viirya opened 5 months ago

viirya commented 5 months ago

Is your feature request related to a problem or challenge?

Create physical scalar expression in functions modules from string

Currently more and more built-in scalar functions are moved to functions modules, e.g. https://github.com/apache/arrow-datafusion/pull/9435. It avoids using a long enum of all built-in scalar functions which is hard to maintain. But for Comet, we rely the ability to create a physical scalar expression from string (e.g., datepart).

Previously it is easy and just calls BuiltinScalarFunction::from_str to get BuiltinScalarFunction. But now I don't see such convenient function to do that.

FunctionRegistry provides udf which can return a reference to ScalarUDF. But it requires these UDFs must be registered. As we don't know what UDFs will be used, we need to register all built-in UDFs in the registry. The flaw is, it will create ScalarUDFs for all built-in UDFs even they are not actually used in the queries.

I think we still need an approach that can simply create a physical scalar expression in functions modules from string. So we can create corresponding ScalarUDF on demand.

Another approach might be to avoid creating ScalarUDFs when registering built-in scalar functions.

Avoid creating ScalarUDFs before they are actually used when registering in FunctionRegistry

Actually, I am also wondering if it is necessary to create and register all these ScalaUDFs in DataFusion's FunctionRegistry before these scalar UDFs are actually used.

For example, Spark's FunctionRegistry registers expression builders instead of creating actual expressions when registering built-in expressions. A built-in expression is created only if it is actually used by a query.

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

alamb commented 5 months ago

I think we still need an approach that can simply create a physical scalar expression in functions modules from string. So we can create corresponding ScalarUDF on demand.

I think you can do this via one of these functions (I can't link to rustdocs as 37 hasn't yet been released and the functions are made via macro):

Screenshot 2024-04-01 at 11 51 27 AM

viirya commented 5 months ago

I think you can do this via one of these functions (I can't link to rustdocs as 37 hasn't yet been released and the functions are made via macro):

Hmm, I think you mean that expr_fn module exports individual scalar functions. But to get corresponding function from a string of the scalar function name, we need to write a function like:

fn string_to_scalar_udf(udf_name: &str) -> Arc<ScalarUDF> {
  match udf_name {
    "ascii" -> datafusion_functions::string::ascii(),
    ...
  }
}

It is doable thought it means we need to maintain a long list of matches, but I'm also wondering if there is a better built-in solution in DataFusion.

Omega359 commented 5 months ago

The names of the functions would be in the list of udf's available in the sessionState

    /// Scalar functions that are registered with the context
    scalar_functions: HashMap<String, Arc<ScalarUDF>>,
viirya commented 5 months ago

Oh, scalar_functions is good to use for that. We can get particular ScalarUDF from it without matching scalar udf name outside DataFusion. Currently we use SessionState.udf to retrieve required ScalarUDF.

Although it is still not creating the ScalarUDF on demand.

As I described in the description, for queries which don't use most of these scalar udfs, it looks unnecessary to create and hold these ScalarUDFs in the registry.

I think a better approach should be to create ScalarUDF once it is really needed. This is how Spark handles function registry.

Omega359 commented 5 months ago

Ah, I misread the description, apologies. It's an interesting idea. I don't see an easy solution right now especially since the udf's are held in a singleton once called (see https://github.com/apache/arrow-datafusion/blob/d8d521ac8b90002fa0ba1f91456051a9775ae193/datafusion/functions/src/macros.rs#L66) so any memory savings from not using the udf would evaporate after the first use.

I think it may be possible with a change to the make_udf_function to remove the singleton code however I'm not sure of the wisdom of that.

alamb commented 5 months ago

It is doable thought it means we need to maintain a long list of matches, but I'm also wondering if there is a better built-in solution in DataFusion.

I think adding a function that did this matching to datafusion seems like a good idea to me. You could probably write a test to ensure it was kept in sync with the list of functions pretty easily.

I think a better approach should be to create ScalarUDF once it is really needed. This is how Spark handles function registry.

What is the concern about creating a bunch of ScalarUDFs that don't get used on process start? Is it to reduce process startup time / memory overhead?

viirya commented 5 months ago

What is the concern about creating a bunch of ScalarUDFs that don't get used on process start? Is it to reduce process startup time / memory overhead?

Yea, I suppose that these ScalarUDFs occupy additional resources as they are not free to create and hold. The number of built-in scalar functions is somehow large and grows continuously. If we can make them inactively created, it might be better.

alamb commented 5 months ago

BTW while updating influxdb we ended up with a bunch of the following in our code. I think the API @viirya is describing on this ticket would help us too

    let date_bin = datafusion::functions::datetime::functions()
            .iter()
            .find(|fun| fun.name().eq("date_bin"))
            .expect("should have date_bin UDF")
            .to_owned();
jayzhan211 commented 5 months ago

How about we introduce pub type ScalarFunctionImpl = Arc<dyn Fn() -> std::sync::Arc<datafusion_expr::ScalarUDF>>; , so we can register the closure first and create the udf on demand.

If I understand correctly, only at the point we call the closure, we will run this function.

            pub fn $SCALAR_UDF_FN() -> std::sync::Arc<datafusion_expr::ScalarUDF> {
                [< STATIC_ $UDF >]
                    .get_or_init(|| {
                        std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl(
                            <$UDF>::new(),
                        ))
                    })
                    .clone()
            }

https://users.rust-lang.org/t/cost-of-creating-closure-vs-cost-of-creating-struct/33101/5

Omega359 commented 5 months ago

Note that the use of name -> function is used internally within scalar_function.rs:

https://github.com/apache/arrow-datafusion/blob/2dad90425bacb98a3c2a4214faad53850c93104e/datafusion/physical-expr/src/scalar_function.rs#L149

Having a good name -> fn lookup mechanism would help here I think.

alamb commented 5 months ago

How about we introduce pub type ScalarFunctionImpl = Arc<dyn Fn() -> std::sync::Arc<datafusion_expr::ScalarUDF>>;, so we can register the closure first and create the udf on demand.

I was thinking maybe we can even avoid new types / macros with a function like this:

fn get_udf(name: &str) -> Option<Arc<ScalarUDF>> {
  // build hash table of deferred functions that create functions on demand
  // (note this hash table would probably be built in a static OnceLock or something)
  let functions: HashMap<_, _> = [
    ("abs", Box::new(|| crate::math::abs()), 
    ("sin", Box::new(|| crate::math::sin()), 
    ("date_bin", Box::new(|| crate::datetime::date_bin()), 
  ...
 ].into_iter().collect();

  functions.get(name).map(|factory| (factor)())
}

Then we could then ensure this was in sync with a test something like

for expected_function in datafusion_functions::all_functions() {
  assert!(get_udf(expected_function.name()).is_some())
}
jayzhan211 commented 5 months ago

I can work on this. I plan to introduce get_udf that replaces register_all. We can just call get_udf if we need one.

alamb commented 5 months ago

I can work on this. I plan to introduce get_udf that replaces register_all. We can just call get_udf if we need one.

Thanks @jayzhan211

I think both patterns are needed -- one that gets all UDFs (what register_all currently does) as well as a "get me just a single udf"

jayzhan211 commented 5 months ago
    fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
        self.state.scalar_functions().get(name).cloned()
    }

I was considering replacing the get which is why I think register_all may not be needed. Let me try it out and see 👀 .

jayzhan211 commented 5 months ago

I draft the idea here, I think the old function can be deprecated, but it will be an API change design, should we deprecate them?

use std::sync::OnceLock;

pub type ScalarFactory = Box<dyn Fn() -> Arc<ScalarUDF> + Send + Sync>;

/// HashMap Singleton for UDFs
///
/// Replace register_all with our built-in functions
/// Replace  scalar_functions: HashMap<String, Arc<ScalarUDF>> in SessionState
pub fn scalar_functions() -> &'static Mutex<HashMap<String, ScalarFactory>> {
    static FUNCTIONS: OnceLock<Mutex<HashMap<String, ScalarFactory>>> = OnceLock::new();
    FUNCTIONS.get_or_init(|| {
        let mut functions = HashMap::new();
        functions.insert(
            String::from("array_to_string"),
            Box::new(string::array_to_string_udf) as _,
        );
        // TODO: Add more builtin functions here
        Mutex::new(functions)
    })
}

// Get an UDF by name
//
// Replace with `get_udf`
// fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
//     self.state.scalar_functions().get(name).cloned()
// }
pub fn get_udf(name: &str) -> Option<Arc<ScalarUDF>> {
    scalar_functions().lock().unwrap().get(name).map(|f| f())
}

/// Register a single new UDF, so the user can register their own functions
/// 
/// Repalce old regsiter_udf
pub fn register_udf(name: &str, udf: ScalarFactory) -> Option<ScalarFactory> {
    scalar_functions().lock().unwrap().insert(name.to_string(), udf)
}
alamb commented 5 months ago

/// Register a single new UDF, so the user can register their own functions

I am not sure about this -- I think it would be better if the data in the module was static / not mutable at runtime. If users want to register their own functions, they can do so in a FunctionRegistry

I draft the idea here, I think the old function can be deprecated, but it will be an API change design, should we deprecate them?

How about we try what having both APIs would look like? Maybe it is too much duplication, but I bet it would be pretty minimal