apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.86k stars 1.11k forks source link

Prototype implementing DataFusion functions / operators using `arrow-udf` liibrary #11413

Open alamb opened 1 month ago

alamb commented 1 month ago

Is your feature request related to a problem or challenge?

Related to the discussion on https://github.com/apache/datafusion/discussions/11192 with @Xuanwo

RisingWave has a library for automatically creating vectorized implementations of functions (e.g. that operate on arrow arrays) from scalar implementations

The library is here: https://github.com/risingwavelabs/arrow-udf

A blog post describing it is here: https://risingwave.com/blog/simplifying-sql-function-implementation-with-rust-procedural-macro/

DataFusion uses macros to do something similar in binary.rs but they are pretty hard to read / understand in my opinon: https://github.com/apache/datafusion/blob/7a23ea9bce32dc8ae195caa8ca052673031c06c9/datafusion/physical-expr/src/expressions/binary.rs#L118-L130

One main benefit I can see to switching to https://github.com/risingwavelabs/arrow-udf is that we could then extend arrow-udf to support Dictionary and StringView and maybe other types to generate fast kernels for multiple different array layouts.

Describe the solution you'd like

I think it would be great if someone could evaluate the feasibility of using the macros in https://github.com/risingwavelabs/arrow-udf to implement Datafusion's operations (and maybe eventually functions etc)

Describe alternatives you've considered

I suggest a POC that picks one or two functions (maybe string equality or regexp_match or something) and tries to use arrow-udfs function macro instead.

Here is an example of how to use it: https://docs.rs/arrow-udf/0.3.0/arrow_udf/

Additional context

No response

xinlifoobar commented 1 month ago

take

xinlifoobar commented 1 month ago

Sorry it takes longer than I expected to make this works end-to-end. I plan to make an ScalarUDF with arrow-udf as the completion of the prototype work.

From my perspective (feel free to correct me if I'm wrong),

Good points:

Bad points:

Neural:

I'd think we could replace some string functions, that are not supported by arrow-string by arrow-udf to get rid of macros like compute_utf8_op.

https://github.com/apache/datafusion/blob/f204869ff55bb3e39cf23fc0a34ebd5021e6773f/datafusion/physical-expr/src/expressions/binary.rs#L616-L664

An example would be

// declare concat
#[function("concat(string, string) -> string")]
#[function("concat(largestring, largestring) -> largestring")]
fn concat(lhs: &str, rhs: &str) -> String {
    format!("{}{}", lhs, rhs)
}

// reference concat
apply_udf(
    &ColumnarValue::Array(left),
    &ColumnarValue::Array(right),
    &Field::new("", DataType::Utf8, true),
    "concat",
)

CC @alamb

xinlifoobar commented 1 month ago

Btw... the ColumnarValue introduced into datafusion 2 years ago. Considering:

Will it be better to use RecordBatches instead of ColumnValues in PhysicalExpr evaluate function? It will provide finely integrations with arrow-rs eco system.

alamb commented 1 month ago

Btw... the ColumnarValue introduced into datafusion 2 years ago. Considering:

  • ColumnarValue are always paired use with SchemaRef
  • RecordBatches are one or more ColumnarValues with schema.

Will it be better to use RecordBatches instead of ColumnValues in PhysicalExpr evaluate function? It will provide finely integrations with arrow-rs eco system.

One thing that ColumnarValue does well is represent single values efficiently (aka ScalarValue) which is a very important optimization for performance

I don't see any fundamental reason we couldn't use RecordBatch if we figured out a better way to represent single row

alamb commented 1 month ago

Sorry it takes longer than I expected to make this works end-to-end. I plan to make an ScalarUDF with arrow-udf as the completion of the prototype work.

Thank you so much @xinlifoobar -- this is really helpful and a great analysis (I think the pros/cons you identified make a lot of sense to me)

From what I can see, if we wanted to proceed with using arrow-udf in DataFusion we would need to address the shortcomings you identified above.

Here are some additional discussions

By default all udf are private, lack of a way to reference the udf that could be used in e.g., ExprPlanner.

I think this is part of the same concept as discussed on https://lists.apache.org/thread/x8wvlkfr0osl15o52rw85wom0p4v05x6 -- basically the arrow-udf library's scope is large enough to encompass things like a function registry that DataFusion already has

Lack of support for operations against array and scalar.

I do think being able to special case scalar value is a critical requirement for performance.

I will post about your findings on the mailing lists and let's see what the authors of arrow-udf have to say

cc @wangrunji0408 @Xuanwo

alamb commented 1 month ago

An example would be

FWIW this implementation of concat would likely perform pretty poorly compared to a hand written one as it will both create and allocate a new temporary String for each row (and then presumably copy that value into a final StringArray/LargeStringArray).

// declare concat
#[function("concat(string, string) -> string")]
#[function("concat(largestring, largestring) -> largestring")]
fn concat(lhs: &str, rhs: &str) -> String {
    format!("{}{}", lhs, rhs)
}
wangrunji0408 commented 1 month ago

An example would be

FWIW this implementation of concat would likely perform pretty poorly compared to a hand written one as it will both create and allocate a new temporary String for each row (and then presumably copy that value into a final StringArray/LargeStringArray).

// declare concat
#[function("concat(string, string) -> string")]
#[function("concat(largestring, largestring) -> largestring")]
fn concat(lhs: &str, rhs: &str) -> String {
    format!("{}{}", lhs, rhs)
}

In this case, a writer-style return value is supported to avoid the overhead.

#[function("concat(string, string) -> string")]
#[function("concat(largestring, largestring) -> largestring")] // will be supported soon
fn concat(lhs: &str, rhs: &str, output: &mut impl std::fmt::Write) {
    write!(output, "{}{}", lhs, rhs).unwrap();
}

The same technique can be applied to binary and largebinary. However, it is not yet implemented due to the lack of impl std::io::Write for (Large)BinaryBuilder in arrow crate.

// to be supported
#[function("concat(binary, binary) -> binary")]
#[function("concat(largebinary, largebinary) -> largebinary")]
fn concat(lhs: &[u8], rhs: &[u8], output: &mut impl std::io::Write) {
    output.write_all(lhs).unwrap();
    output.write_all(rhs).unwrap();
}
xxchan commented 1 month ago

To clarify, the arrow-udf project provides several things. (I just tried to explain it here https://github.com/risingwavelabs/arrow-udf/pull/55) For DataFusion, we are mainly interested in the #[function] macro (in the arrow-udf crate), right?

alamb commented 1 month ago

To clarify, the arrow-udf project provides several things. (I just tried to explain it here risingwavelabs/arrow-udf#55) For DataFusion, we are mainly interested in the #[function] macro (in the arrow-udf crate), right?

the #[function] macro seemed the obvious thing that would be easiest to apply to DataFusion. The rest of the system (for loading user defined functions at runtime with WASM and a function registry) might be interesting, but DataFusion already has similar features already, so switching would likely be a large API change for users for unclear benfits at the moment