apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.47k stars 1.01k forks source link

Async User Defined Functions (UDF) #6518

Open marshauf opened 1 year ago

marshauf commented 1 year ago

Is your feature request related to a problem or challenge?

I would like to use async code in an UDF. I couldn't find an example or API documentation on how to do that. It would be nice if it would be possible/documented.

Describe the solution you'd like

datafusion::physical_plan::functions::make_scalar_function() accepts functions which return a Future.

Describe alternatives you've considered

Creating another tokio runtime and offloading the async function onto it. The main runtime waits in the UDF till async function is done with execution.

Additional context

No response

alamb commented 1 year ago

I agree there is currently no good way to make a scalar function async.

You could potentially use a table provider and write to your function like INSERT INTO your_table SELECT ... 🤔

That might not work for your usecase however

marshauf commented 1 year ago

My idea was to do something likes this:

SELECT call('localhost:3000', num, letter FROM (SELECT * FROM (VALUES (1, 'one'), (2, 'two'), (3, 'three')) AS t (num,letter))

It would probably work with your example.

CREATE EXTERNAL TABLE your_table STORED AS CSV WITH HEADER ROW LOCATION 'localhost:3000';
INSERT INTO your_table SELECT * FROM (VALUES (1, 'one'), (2, 'two'), (3, 'three')) AS t (num,letter);
SELECT * FROM your_table;

And on insert call endpoint and store returned value. Seems cumbersome to use and implement.

marshauf commented 1 year ago

@alamb wouldn't it be possible to turn the SQL statement

SELECT call('localhost:3000', num, letter FROM (SELECT * FROM (VALUES (1, 'one'), (2, 'two'), (3, 'three')) AS t (num,letter))

into a LogicalPlan. Rewrite the Expr::ScalarUDF to a Expr::SubQuery with an LogicalPlan::Extension. The Extension would point to a custom ExecutionPlan, in which I could run async code. Similar to what is done in datafusion/core/tests/user_defined_plan.rs.

alamb commented 1 year ago

@alamb wouldn't it be possible to turn the SQL statement

Yes, that sounds like it would work (I am sorry I didn't suggest that)

If you get it to work, I think it would be a great example to include in DataFusion to show both the power of the existing extension APIs and custom table functions)

marshauf commented 1 year ago

I got an example working which replaces an UDF with a user defined Extension. The extension processes RecordBatches in an async function.

It works great with VALUES as inputs but not with a csv file.

I will clean it up and create a PullRequest. I hope you can help me figure out why inputs from VALUES is processed different to a csv files.

alamb commented 8 months ago

I filed https://github.com/apache/arrow-datafusion/issues/7926 to track user defined table functions