risingwavelabs / risingwave

SQL stream processing, analytics, and management. We decouple storage and compute to offer instant failover, dynamic scaling, speedy bootstrapping, and efficient joins.
https://www.risingwave.com/slack
Apache License 2.0
6.58k stars 537 forks source link

Tracking: User-Defined Function(UDF) Support #7405

Open wangrunji0408 opened 1 year ago

wangrunji0408 commented 1 year ago

This issue tracks the progress of UDF support in RisingWave kernel.

We plan to support UDF in two ways:

  1. External process which implements Apache Arrow Flight protocol (external Python, Java process)
  2. Embedded execution on WASM or Python interpreter. (inline Python, JavaScript)

Any ideas and suggestions are welcome!

Related RFC: https://github.com/risingwavelabs/rfcs/pull/12

Progress

Frontend:

Aggregate function:

SQL UDF:

Python UDF:

Java UDF:

WASM UDF:

Problems

jon-chuang commented 1 year ago

What is the interface for WASM? :) We can probably leverage arrow SDK too?

wangrunji0408 commented 1 year ago

What is the interface for WASM? :) We can probably leverage arrow SDK too?

Yeah, the data exchange format will be arrow too. But the specific user interface has not been decided yet. For example, we may allow user to upload a compiled wasm object file, or we can help user compile their Python/JS code to wasm. The latter sounds more convenience.

jon-chuang commented 1 year ago

https://github.com/risingwavelabs/risingwave/issues/7410 Seems like a good case.

My concern with WASM kernel is

  1. whether we will embed the VM into the compute node
  2. how we will handle errors, including the VM errors like out of bounds

Not sure how the user-defined errors can be propagated, however. I believe we should implement a wrapper type for such errors? So the response from external UDFs and response for WASM UDFs will be wrapped in this error type.

It seems that if we do not do 1., it is as good as an external function. Because if we do 1., then we get errors directly from the WASM VM, we can control the fault-tolerance/scaling story more meaningfully within the risingwave compute node than with external UDF and the user does not need to host the external UDF elsewhere.

wangrunji0408 commented 1 year ago

Not sure how the user-defined errors can be propagated, however. I believe we should implement a wrapper type for such errors? So the response from external UDFs and response for WASM UDFs will be wrapped in this error type.

Sure. No matter whether we embed the VM or put it as an external function, there will be a lot kinds of errors to deal with. And I don't think there is anything we can do other than report them directly to users.

For question 1., I think it should be embedded into the compute node, because we can take advantage of the VM to avoid communication latency between processes and even nodes.

xiangjinwu commented 1 year ago

Do we intend to support implicit cast of arguments or require explicit cast? For example, given a builtin sind(double precision) -> double precision, we can write sind(30). But if that was a udf we have to write sind_udf(30::double precision) right now.

wangrunji0408 commented 1 year ago

Do we intend to support implicit cast of arguments or require explicit cast?

Yes, we should support implicit cast for UDF as postgres does. This is not yet implemented and I'll create an issue to track it.