dask-contrib / dask-sql

Distributed SQL Engine in Python using Dask
https://dask-sql.readthedocs.io/
MIT License
376 stars 71 forks source link

ETL in dask-SQL #209

Open rajagurunath opened 2 years ago

rajagurunath commented 2 years ago

Dask was used as scheduler/compute engine in a number of ETL TOOLS like Airflow, prefect, Dagster, etc. thanks to the Dask distributed server infrastructure.

Shall we try to make use of this advantage and try to provide a SQL-only ETL experience for the User?
TLDR; shall we integrate dask-sql with ETL Tools? (maybe a customOperator or Task or Step)

I am thinking of two approaches as follows to implement the same:

  1. Tightly integrate dask-sql within the ETL frameworks, i.e create the dask-sql context in each task/step of the Dag, and pass the context from one task to another task (seems like a hack, not perfect optimized approach)

image

  1. start the Dask-SQL server , while starting the ETL, Complete Extract, transform, load, and gracefully tear down the dask-sql server, So we already have functions to create/spin up the uvicorn server thread, (we will pass required dask configurations to that function from the ETL framework) and perform ETL by sending a request to the server (or by using presto client) and we will implement graceful shutdown API in FastAPI app? (is it possible or it will cause any problem : )

This idea was inspired from ETL using dask blog : https://examples.dask.org/applications/prefect-etl.html

Started with a very basic/Rough implementation for approach 1 (using prefect) here: https://github.com/rajagurunath/dask-sql-etl

@nils-braun what do you think about this proposal? Do you think it will really add value to the user? or any other approach we can follow to achieve the same?

Some known issues so far :

since most of the ETL tools spawn a thread or process to execute a task, internal java virtual machine should spin up each time. currently hacked this problem based on the discussion that happened here: https://github.com/nils-braun/dask-sql/issues/141 ❤️ added the following lines and it worked fine :)

        import dask_sql
        from dask_sql import java
        # print where it gets the class from. That should be the DaskSQL.jar
        print(java.org.codehaus.commons.compiler.CompilerFactoryFactory.class_.getProtectionDomain().getCodeSource().getLocation())
        print(java.org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory())
nils-braun commented 2 years ago

Shall we try to make use of this advantage and try to provide a SQL-only ETL experience for the User?

Yes! That would be very cool! Thank you for this idea - I thing it will be a very valuable addon and I can't wait to see it in action!

How to implement this feature is definitely not simple. You had two good ideas, however I fear that they will not work when the number of (workflow engine) workers starts to grow. I have only experience with Airflow, but as you correctly pointed out more workers means more threads (or processes, doesn't matter for my point) and we would need to decide if we open a new connection to the Dask cluster, and how we can re-register the tables and functions at the context (as you also correctly pointed out).

So it is generally a good idea to just have a single context and pass it around. The problem is, how do we pass this context over worker borders? Pickling a context will be hard (so far, it does not work, e.g. also due to the JVM references), but this is the only possibility to send a context between tasks over process or node borders. It would be possible to just send the schema definition (we would need to find a way to also send the defined plugins, that a user can override, but let's do not think about this now), but how can we make sure that the connection to the dask cluster, which all of the dataframes in the schema hold, is the same on all workers? Things like credentials, connections and firewalls will make things hard and as this connection to the dask cluster is pickled "implicitly" I think this will lead to problems that are hard to debug. So you are completely right: this possibility is maybe a bit "hacky".

Possibility two on the other hand, removes many of these issues. All a single task needs to do is open a connection to a database (the task does not even care if it is dask-sql or just normal presto). Basically all workflow engines already know how to do this, so that's cool! There is still one caveat: how do we make sure that the dask-sql server running on the machine which started the DAG is reachable by all worker nodes/processes? There might be complex network setups (I have worked in a particle physics experiment - things get complicated fast ;-)).

So I would propose another solution 2b, which builds on your possibility number 2: let the user start the dask-sql server "externally" (like they would start an Impala server, PostgreSQL database or alike) and just connect to it. In this case, users can decide where to run the dask-sql server by themself (and might know the best place in their network which is reachable by all workers) and they only need to connect to the dask cluster once (from within the dask-sql server). We already have convenience functions to start a dask-sql server (via CLI or python or docker) and we do not need to think about pickling or having multiple contexts or dask connections or the JVM.

What do you think? I think our next step would be to built a small example to try this out. We can re-use most of the code which you have written for prefect. If you want, I can also built one example with Airflow (which I am more familiar with).

rajagurunath commented 2 years ago

Hi @nils-braun, Thanks for the detailed explanation of the possible solution , completely agreed! , As you said we can go ahead with solution 2b , which is straightforward, easy to setup and non-opinated.

Sure so I try to refactor the code base with proposed changes. So once the dask-sql server was up and running, we are going to use presto client for connection or any other plan like just using simple request package ?

nils-braun commented 2 years ago

Good question! I would think the easiest is sqlalchemy with a presto library/protocol (e.g. https://github.com/dropbox/PyHive)