apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.33k stars 1.2k forks source link

Specialized / Pre-compiled / Prepared ScalarUDFs #8051

Open alamb opened 1 year ago

alamb commented 1 year ago

Is your feature request related to a problem or challenge?

Currently, scalar UDF functions can not be "specialized"

SELECT * FROM t where my_matcher(t.column, '[a-z].*');

What happens is that the regexp string '[a-z].*' gets passed as a literal expression to each invocation of the function, then you have to compile the regex again and again for every batch you process through the UDF.

Since it is expensive to compile these RegExps, it would be nice if there was something that could compile the RegExp once per plan rather than once per batch

Describe the solution you'd like

No response

Describe alternatives you've considered

See comments below

Additional context

Suggested by @thinkharderdev on https://github.com/apache/arrow-datafusion/issues/8045#issuecomment-1793267211

alamb commented 1 year ago

@thinkharderdev suggests

it would be useful to be able to serialize constant parameters into the user-defined scalar function themselves rather than pass them in as expressions. So for instance if you had to create a UDF to do something with a regex that you have as a static constant. Currently the way to do that is pass it as a literal expression. But then you have to compile the regex again for every batch you process through the UDF. Ideally you could have something like:

struct MyRegexUdf {
  regex: Regex
}

impl ScalarFunction for MyRegexUdf {
  // use regex on each value somehow
}

The regex would only need to be compiled once during deserialization (or construction) instead of once for each batch.

alamb commented 1 year ago

One way to achieve this might be a PhysicalOptimizerRule that replaces relevant instances of ScalarFunctionExpr.

However this is likely somewhat awkward to write and is not clear that these expressions would serialize well (as serialization matches a name to an expr). It is probably possible to do this during derialization with the PhysicalExtensionCodec

alamb commented 1 year ago

@2010YOUY01 noted that in order to use the same APIs for built in functions and ScalarUDFs we will need to have some way to handle:

Figure out the interface to let core pass information to function definitions (e.g. now() requires to be passed query start time from core)

This sounds very similar to this method

Maybe we could add a prepare(ctx: &TaskContext) type method on ScalarUDF 🤔 that can potentially return a new version of the function to invoke

alamb commented 11 months ago

I think the trick here will be to ensure we can still serialize such "precompiled" functions

What I was thinking was maybe we can make a new PhysicalExpr that is like

/// A function that is "precompiled" in some way 
/// for example, for a regular expression that has a constant argument
/// constant can be pre-compiled into a Regexpr match instance once per query
/// rather than once per batch
///
/// Somtimes precompiling make it hard/impossible to serialize the function again (e.g. the prepared regular expressions)
/// so this structure contains the original PhysicalExpr that can be used to serialize the function
struct PrecompiledExpr {
  precompiled: Arc<dyn PhysicalExpr>,
  original: Arc<dyn PhysicalExpr>
}
sadboy commented 11 months ago

That seems a bit heavy handed? Why not simply augment the scalar function expr node with an interior mutable "cache" cell, that is invisible to serialization? The cell can be automatically populated as needed by the function implementation upon node instantiation/deserialization.

alamb commented 11 months ago

That seems a bit heavy handed? Why not simply augment the scalar function expr node with an interior mutable "cache" cell, that is invisible to serialization? The cell can be automatically populated as needed by the function implementation upon node instantiation/deserialization.

@sadboy that is a (really) good idea

alamb commented 11 months ago

That seems a bit heavy handed? Why not simply augment the scalar function expr node with an interior mutable "cache" cell, that is invisible to serialization? The cell can be automatically populated as needed by the function implementation upon node instantiation/deserialization.

@sadboy that is a (really) good idea

BTW I thought more about this and one challenge is that currently the scalar function implementations that are passed around may be shared -- so using interior mutability would get confusing as each function invocation would be from the same instance of the function

sadboy commented 11 months ago

the scalar function implementations that are passed around may be shared -- so using interior mutability would get confusing as each function invocation would be from the same instance of the function

Hmm, not sure I'm following? I was thinking of the ScalarFunctionExpr physical node (https://docs.rs/datafusion-physical-expr/34.0.0/src/datafusion_physical_expr/scalar_function.rs.html#51), e.g. adding something like

pub struct ScalarFunctionExpr {
[...]
    args: Vec<Arc<dyn PhysicalExpr>>,
    prepared_args: OnceCell<Vec<Box<dyn PreparedArgs>>>,
[...]
}

where prepared_args is populated by the return value of calling ScalarFunction::prepare on the set of physical args. The function implementation objects themselves do not need to mutate -- they only need to provide an optional implementation of ScalarFunction::prepare method if they wish to pre-process the arguments. prepared_args is purely an optimization construct, it should have no effect on the semantics of ScalarFunctionExpr -- the node should compute the exact same result whether or not prepared_args is populated. This way, any part of the system (e.g. Serialize, Clone, etc.) is always free to simply drop it without affecting correctness.

alamb commented 11 months ago

BTW my hope it to prototype how this would work (as a ScalarUDF) by building on top of https://github.com/apache/arrow-datafusion/pull/8578

It would be pretty rad

alamb commented 8 months ago

BTW now that @jayzhan211 and I have implemented ScalarUDF::simplify in https://github.com/apache/arrow-datafusion/pull/9298 and we have ported the regular_expression functions to use ScalarUDF, I think we could actually use that API to implement precompiled functions

Note sure if that would meet your requirements @thinkharderdev

For example, to implement "precompiled regexp functions" we could do something like this (would be sweet if someone wanted to prototype this):

/// A new UDF that has a precompiled pattern
impl PrecompiledRegexpReplace {
  precompiled_match: Arc<Pattern>
}

impl ScalarUDFImpl for PrecompiledRegexpReplace  {
   // invoke function uses `self.precompiled_match` directly
...
}

// Update the existing RegexpReplace function to implement `simplify`
impl ScalarUDFImpl for RegexpReplace  {

  /// if the pattern argument is a scalar, rewrite the function to a new scalar UDF that
  /// contains a pre-compiled regular-expression
  fn simplify(&self) .. { 
    match (args[1], args[2]) {
       (ScalarValue::Utf8(pattern), ScalarValue::Utf8(flags)) => {
         let pattern = // create regexp match
         SImplified::Rewritten(ScalarUdf::new(PrecompiledRegexpMatch { precompiled } )))
          .call(args)
       }, 
      _ => Simplified::Original(args)
  }
}

We could then run some gnarly regular expression case, such as what is found on https://github.com/apache/arrow-datafusion/issues/8492 and see if it helps or not.

If it doesn't help performance, then the extra complexity isn't worth it for regexp_replace

thinkharderdev commented 8 months ago

Note sure if that would meet your requirements @thinkharderdev

Yeah, although I was mainly using regexes as an example originally. Our particular issues are with some UDFs we have implemented that require some hilarious hacks to pass state as expressions.

But the idea does seem good to me.

alamb commented 4 months ago

FYI there is another discussion about this here: https://github.com/apache/datafusion/issues/11146

findepi commented 2 months ago

After https://github.com/apache/datafusion/issues/9289, it would be good to recap what's tbd in this issue.

milenkovicm commented 2 months ago

It looks somebody needs to prototype what @alamb suggested in

https://github.com/apache/datafusion/issues/8051#issuecomment-1979747257

I believe there is small change, some args will be used to create pattern so they should be removed from passing them down to result expression

match (args[1], args[2]) {
     (ScalarValue::Utf8(pattern), ScalarValue::Utf8(flags)) => {
       let pattern = // create regexp match, probably from  args[1], and arg[2]
       let new_args = vec![args[0]]
       SImplified::Rewritten(ScalarUdf::new(PrecompiledRegexpMatch { precompiled } )))
        .call(new_args)
},

The only downside I see is that the "state" is not going to be serialised if it has to be distributed in systems like ballista. It would make sense to have generic "serialise to physical plan" at some point, it would help with distributing something like python udfs but, thats probably different discussion.

also note: https://github.com/apache/datafusion/pull/12270

findepi commented 2 months ago

The only downside I see is that the "state" is not going to be serialised if it has to be distributed in systems like ballista.

That's because we don't use expressions. What about using simplify all the way down? The PrecompiledRegexpMatch could get dumped as a bytes buffer (varbinary) into an expression and then cast back to PrecompiledRegexpMatch. This will work as long as it's a flat structure. It won't work when it's something that has pointers internally and requires actual serialization.

Alternatively, we can avoid all this complexity -- at the cost of different complexity, but conceptually simpler. Let's imagine ScalarUDF invoke gets an option to create a thread local scratch space that it can reuse on all invocations. That would make reusing compiled pattern easier without having to serialize it in the plan. The downside would be that the implementation would need to explicitly check whether the pattern is the same on every invocation (equality check once per batch).

milenkovicm commented 2 months ago

Sorry but I'm not sure I understand why do you need thread local storage.

What @alamb proposed will work with datafusion execution, no need for any other state handling, state is in the struct's property, which will be re-used for each and every batch this function participate, PrecompiledRegexpMatch will be "specialised" for a given logical plan.

Or to put it in a different way, if there is no need to serialise your plan to be distributed, like with ballista, I believe proposed solution totally make sense.

Do you have use-case where you need to distribute function expression to different executors, like ballista ?

findepi commented 2 months ago

I am not using ballista currently. I realized the plan serialization concern is easy to address if we separate simplify into phases: the Expr-constraint simplify (e.g. pruning args known to be null, etc) that would be run during plan optimization phase. And then, local execution simplify which allows a function to "compile itself" into most optimal form, without any needs for serialization anymore. Ballista would need to serialize and distribute the plans in between these phases.

BTW we focused so far on compiling regular expressions, but we didn't think about memory needs for their execution. Internally regex::Regex::is_match uses a synchronized pool of "caches" (regex execution scratch space) underneath. I don't know if this is a perf problem (probably not!), but let me use this as an example. It would probably be good if at runtime a scalar could have its own thread local "scratch space" / "local buffer". And without having to use thread locals which aren't great if DF is embedded and doesn't control thread creation.

Why am I mentioning this? I thought that maybe if we had "scratch space" / "local buffer" support, we wouldn't have need to "compile functions" during planning.

alamb commented 18 hours ago

Perhaps the ScalarFunctionArgs added in

alamb commented 15 hours ago

Copying a comment from https://github.com/apache/datafusion/pull/13290

Yes, I was thinking something like

trait ScalarUDFImpl { 
  /// prepares to run the function, returning any state (such as 
  /// a precompiled regex). Called once per instance of function in the query
  fn prepare(&self, args: &ScalarFunctionArgs) -> Option<Box<dyn Any>> { None }

  /// `prepared` field in ScalarFunctonArgs has the result of calling `prepare`
  fn invoke_with_args(&self, args: &ScalarFunctionArgs) -> ...
pub struct ScalarFunctionArgs<'a> {
  ...
  /// The result from a call to `prepare` for this function instance
  prepared: Option<Box<dyn Any>>,
}