apache / datafusion-ballista

Apache DataFusion Ballista Distributed Query Engine
https://datafusion.apache.org/ballista
Apache License 2.0
1.39k stars 181 forks source link

Add support for Python UDFs in distributed queries #173

Open andygrove opened 1 year ago

andygrove commented 1 year ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do. I would like to be able to execute queries containing custom Python code. This is already supported in DataFusion but we need to add the serde aspect in Ballista.

Describe the solution you'd like It looks like we can serialize Python functions with https://docs.rs/pyo3/0.7.0/pyo3/marshal/index.html

We then need to store the serialied Python functions in protobuf and deserialize them in the executors.

Describe alternatives you've considered None

Additional context None

avantgardnerio commented 1 year ago

Fancy!

adriangb commented 1 year ago

Would the plan be to focus on Python UDFs that don't have dependencies? I don't think I've ever seen a good story for taking Python code from one place and executing it in another if there are 3rd party dependencies involved. Would you folks consider some sort of HTTP UDF as well? BigQuery supports a similar feature. It could probably be done better here, e.g. by transmitting over the wire in a more efficient format and providing tooling to deserialize on the other end (maybe DataFusion?).

andygrove commented 1 year ago

Unfortunately, I do not have the Python skills to answer the question. I think we can learn from how Dask does this? I wonder if @jdye64 can offer any high-level guidance.

andygrove commented 1 year ago

Another approach here is to implement a custom executor process that wraps Ballista and takes care of registering custom Python code as UDFs so that we don't have to worry about trying to send code over the network.

jdye64 commented 1 year ago

Yes, @adriangb is right. Much pain comes in trying to serialize and execute python code on remote nodes that have dependencies. This has been the case since even for Hive UDFs back years ago.

The python ecosystem as a whole is one that relies heavily on existing dependencies. Therefore I think if we can come up with a straightforward method for ensuring all of the executors have a valid virtual environment with all the dependencies required by the UDF installed we should be good. This is the approach we take in some parts of Dask for example.

So maybe as part of the Python UDF registration we require a "list" of dependencies that are required by the UDF. When the executor server starts up it could create that virtual env, through pip or conda or whatever is chosen, and installed those dependencies. Think of it like a executor server bootstrapping process. Then when any sql queries are submitted the UDF can be serialized and sent to the executor, once there the UDF can be executed in that virtual environment.

Couple of thoughts

adriangb commented 1 year ago

I think being able to run Python UDFs is a must, almost not even worth having Python UDF support if dependencies can't be used. This is just my opinion and not a fact.

Agreed that if you want to allow custom Python code to run you need to allow 3rd party dependencies. But 3rd party dependencies are a whole big can of worms in Python, to the point where I would avoid opening it if you can. Hence the suggestion for HTTP UDFs which also have other benefits/use cases.

I think a key aspect of this is to allow users to stick to the workflows they know instead of having to build a new one. For example as a data engineer/backend dev I manage a large project with multiple deployable artifacts that get bundled into containers and deployed on k8s. I have the knowledge and infrastructure in place to handle all of the complexities involved in this (e.g. locking dependencies across deployable alá Cargo workspaces). Any sort of new dependency management paradigm that does not fit in with this is extra work and possibly a source of bugs. That includes the SSH into a node model and Airflow's same dependency everywhere model 🙃. I think a good model would be a sort of "UDF executor web framework" that does some hand-holding but ultimately leaves packaging and dependencies up to users:

from ballista import UDFExecutorApp, udf
from pyarrow import RecordBatchReader

@udf.aggregator(name="override_the_name")
def some_aggregation_function(reader: RecordBatchReader) -> Iterable[RecordBatch]:
    ...

def main() -> None:
    app = UDFExecutorApp([some_aggregation_function])
    app.serve(scheduler_host=...)

I'm just making something up here but the point is to keep a relatively familiar Flask-style API but abstract away the fiddly bits.

This would register itself with the scheduler as being able to execute "override_the_name".

jdye64 commented 1 year ago

I have no hands-on experience with the HTTP UDFs model but am intrigued by the approach. Thank you for laying out your thoughts. Had some thoughts and questions.

Things I really like:

Concerns/Questions (mostly because of my lack of experience here)

adriangb commented 1 year ago

Where would the HTTP server be hosted? Scheduler? Single Executor process? Multiple Executor processes? An entirely new process?

I was thinking "entirely new process".

Wouldn't it make more sense to have that communication channel be Arrow flight instead of HTTP?

With this example above I'd say it can be Arrow flight or any other communication protocol, I'm not constraining it to HTTP. It would be really cool though if as a user I could deploy one thing that can talk over Arrow Flight for use as a UDF but also serve an HTTP API. And it might still be good to support an HTTP UDF like BigQuery does as an escape hatch for arbitrary languages and such.

Seems like this would introduce a large performance hit having to make an external (data movement) remote invocation for each RecordBatch since the data would need to be moved to the HTTP UDF service running on another host.

Yeah this I don't know about. Would the data be currently residing in the Executor? If so the only way to do everything in memory would be to run these UDFs on the Executor itself. Which gets into the complication of dependency management. Could users wrap the executor itself? Maybe when UDFExecutorApp registers itself with the scheduler it becomes an executor which execute regular queries but also the UDFs that it is running locally? It might get confusing if different executors have different UDFs available...