apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.04k stars 1.14k forks source link

Execute LogicalPlan on DBMS directly #970

Closed hu6360567 closed 5 months ago

hu6360567 commented 3 years ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do. I'm implementing a distributed query system, similar with Ballista, but data sources are changing over time, located at MySQL/Postresql Servers. Query is parsed and optimized into LogicalPlan at scheduler, and distributed sub-plans to different executors which have access to the sepcific data sources. The key problem is that not acceptable to load entire table from DBMS to memory and execute the plan, when the table is huge. Pushdown LogicalPlan directly to the DBMS can effectively reduce the size of in-memory data from the botton of all computation process.

Describe the solution you'd like One possible solution is define a new DBExecuteContext which associate with a sql connection, that can convert LogicalPlan back to Plain SQL inside ExecutionPlan and execute it directly to the associated sql connection.

libpg_parser provids the ablity convert from AST back to plain SQL query, but sqlparser-rs hasn't. First step can be taken is datafusion converts LogicalPlan back to Statements, and the second is that sqlparser converts Statements back to sql.

Describe alternatives you've considered Not yet.

Additional context Add any other context or screenshots about the feature request here.

alamb commented 2 years ago

I think implementing a custom TableProvider (and implementing predicate pushdown) might be what you are looking for

Specifically, you could push filters down here into the underlying DBMS and avoid materializing the entire table:

https://github.com/apache/arrow-datafusion/blob/dd9cd15d840dfafdf85a36bbf58008a12d86d1af/datafusion/src/datasource/datasource.rs#L74-L84

hu6360567 commented 2 years ago

Hi @alamb , The goal of converting LogicPlan back to SQL is to push as much as queries to DBMS, so that the size of result from DBMS can be reduced significantly.

Like your proposal, projection/filters/limit can be converted back to SQL to directly execute to DBMS in scan implementation. But for more comlex cases, recursive queries on DBMS data source ( query on table of join two tables in same database), TableProvider may not be suffcient.

My expected workflow:

  1. At planner node, Logical Plan is generated by SQL, and is optimized (pushdown, etc).
  2. Split optimized plan into sub plans, which depends on where inputs are located (from datasource or output from other sub plan).
  3. Convert sub plans to SQL, and send sql to each execution node.
nevi-me commented 2 years ago

Specifically, you could push filters down here into the underlying DBMS and avoid materializing the entire table:

I also lean towards this approach. @hu6360567 see the write_filters function in https://github.com/TheDataEngine/datafusion-mongo-connector/blob/main/src/lib.rs#L288-L396. However, as you mention, they won't get us very far.

With SQL, we still need a way of the source declaring what pushdown it supports (and for datafusion to support pushing aggregates and joins to source).

Split optimized plan into sub plans, which depends on where inputs are located (from datasource or output from other sub plan).

I believe we already have this, as each LogicalPlan unit is an enum with the type of plan (TableScan, Aggregate, Project, etc).

The async fn scan() function allows us to push the filters and projection by converting them into a SQL query based on the dialect of the source. This is based on LogicalPlan::TableScan.

Convert sub plans to SQL, and send sql to each execution node.

I haven't thought about it much lately (I attempted an arrow-datafusion-sql thingy early last year), but the biggest change would be allowing datasources to declare if they support certain plans. For example, if there's a window function and an aggregation, and say MySQL supports them, there should be a TableProvider::supports_filter_pushdown equivalent, which we can call to determine which plans to push down, and which to execute on the resulting data.

But for more comlex cases, recursive queries on DBMS data source ( query on table of join two tables in same database), TableProvider may not be suffcient.

It could be a good exercise to see how Spark implements this. Many years ago when I was working with Spark + Impala data source, I saw that certain joins could be pushed to source via JDBC. It was a multi-source virtualised setup (SAP HANA via Spark; don't ask why), but if 2 or more joins came from the same catalog/schema in the same data source, they could be pushed down as a joined SQL query instead of only filtering the input tables.


So perhaps a good process could be:

While converting a whole logical plan to SQL might be the easier approach, I don't know if it would be generally suitable as there are going to be some queries that datafusion executes differently to different SQL engines, and it's better for datasources to have more control of what gets pushed down.

hu6360567 commented 2 years ago

but the biggest change would be allowing datasources to declare if they support certain plans.

Hi @nevi-me , That's why I convert sub plan back to sql (logical plans are distributed rather than phisical plans). At planner node, the general planner do not need to know the capability of underlying datasource. At execution node, the executor can construct logical plan from SQL to check if DBMS can execute the query, otherwise, split into more sub plans to directly execute at DBMS, and aggregation operations can be done inside datafusion.

In a word, the plan can be optimized as much as possible, if capability is not known; the limit of capability is resolved when it is known.

For example, with a recursive architecture, the exeuction node can be a planner of exeuction group recursively. The underlying datasouce capability does not need to exposed to the top planner, bu only the node which is directly connect to it. The phsical plan can be changed at runtime, as long as the logical plan is guaranteed. Anyway, convert logical plan back to sql is a general way.

alamb commented 2 years ago

But for more comlex cases, recursive queries on DBMS data source ( query on table of join two tables in same database), TableProvider may not be suffcient.

This is a good point @hu6360567 -- I think what you are describing is called "query federation" and is the main usecase for systems like https://trino.io/ (formerly Presto). It might help to look at that project to get some inspiration.

DataFusion could certainly be used for this usecase, but as you point out a non trivial number of additional work is required (like group and join pushdown).

It would be interesting to see what you have in mind for LogicalPlan -> SQL and how that would work. I haven't contemplated using Datafusion as a frontend to push down computation -- I have been thinking in terms of using DataFusion as that engine

xudong963 commented 2 years ago

The key problem is that not acceptable to load entire table from DBMS to memory and execute the plan, when the table is huge. Pushdown LogicalPlan directly to the DBMS can effectively reduce the size of in-memory data from the botton of all computation process.

One idea is to add a cache between the SQL engine and disk store, but this proposal requests a high cache hit ratio. In other words, It depends on your business scenario.

houqp commented 2 years ago

I think with some extension to our existing table provider abstraction, this kind of cross table compute push down could be achieved within our logical or physical plan optimizer?

Following @hu6360567 's logic of splitting logical plans into sub plans, we could perform a rewrite in the plan optimizer to group the query plan tree into sub trees by database instance referenced in table scans. If a sub tree only reads tables from the same database, then we can safely convert that sub tree into a SQL query and push the query down to that database directly. This can be done by rewriting the sub tree into a single plan node that represents a remote SQL query execution.

To achieve this, we need to extend the table providers to supply the following info:

Database name/identifier and type helps the planner decide how to group plan into sub trees by database instance.

Database compute capability helps the planner to further filter down on which subset of the sub tree can be pushed down.

A trimmed down sub plan gets passed down from planner to the table provider's native query compiler to resolve the final query. It doesn't need to be sql and can be any native query supported by the corresponding database type.

Lastly, the planner remove the trimmed down sub plan with a single remote query execution node with the compiled native query.

All of the above can be handled within during the planning stage.

but the biggest change would be allowing datasources to declare if they support certain plans.

That's why I convert sub plan back to sql (logical plans are distributed rather than phisical plans). At planner node, the general planner do not need to know the capability of underlying datasource.

Do we really need to detect data source capability at runtime? Similar to how we define filter pushdown capability using TableProvider::supports_filter_pushdown, we could define these compute capability statically in the source code per database type right?

alamb commented 2 years ago

@houqp, the idea of using an optimizer pass is a great one. We might even be able to do it without any changes to the TableProvider as of now.

For example, if the input plan was like this:

           ┌──────────────────┐                        
           │      Limit       │                        
           └──────────────────┘                        
                     ▲                                 
                     │                                 
           ┌──────────────────┐                        
           │    Aggregate     │                        
           └──────────────────┘                        
                     ▲                                 
                     │                                 
           ┌──────────────────┐                        
           │       Join       │                        
           └──────────────────┘                        
                     ▲                                 
          ┌──────────┴───────────┐                     
          │                      │                     
┌──────────────────┐   ┌──────────────────┐            
│      Scan A      │   │       Join       │            
└──────────────────┘   └──────────────────┘            
                                 ▲                     
                      ┌──────────┴───────────┐         
                      │                      │         
            ┌──────────────────┐   ┌──────────────────┐
            │      Scan B      │   │      Scan C      │
            └──────────────────┘   └──────────────────┘

I could imagine a custom optimizer pass that recoginzed that B and C were available in some external database and could rewrite the plan as you suggest:

                     ┌──────────────────┐                           
                     │      Limit       │                           
                     └──────────────────┘                           
                               ▲                                    
                               │                                    
                     ┌──────────────────┐                           
                     │    Aggregate     │                           
                     └──────────────────┘                           
                               ▲                                    
                               │                                    
                     ┌──────────────────┐         Optimizer pass    
                     │       Join       │       recognizes B and C  
                     └──────────────────┘        are available in   
                               ▲                 external database  
          ┌────────────────────┴───────────┐                        
          │          ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  
┌──────────────────┐             ┌──────────────────┐             │ 
│      Scan A      │ │           │       Join       │               
└──────────────────┘             └──────────────────┘             │ 
                     │                     ▲                        
                                ┌──────────┴───────────┐          │ 
                     │          │                      │            
                      ┌──────────────────┐   ┌──────────────────┐ │ 
                     ││      Scan B      │   │      Scan C      │   
                      └──────────────────┘   └──────────────────┘ │ 
                     │                                              
                      ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ 

            ┌──────────────────┐                           
            │      Limit       │                           
            └──────────────────┘                           
                      ▲                                    
                      │                                    
            ┌──────────────────┐                           
            │    Aggregate     │                           
            └──────────────────┘                           
                      ▲                                    
                      │                                    
            ┌──────────────────┐                           
            │       Join       │         Rewritten Plan    
            └──────────────────┘                           
                      ▲                                    
          ┌───────────┴───────────┐                        
          │                       │                        
┌──────────────────┐    ┏━━━━━━━━━━━━━━━━━━┓               
│      Scan A      │    ┃     Scan B+C     ┃               
└──────────────────┘    ┃  (TableProvider  ┃               
                        ┃ that runs a SQL  ┃               
                        ┃   query in an    ┃               
                        ┃external database)┃               
                        ┗━━━━━━━━━━━━━━━━━━┛               

The specifics of what types of subplans could be converted is probably specific to the database being pushed to, so it isn't clear that logic belongs in the main datafusion crate.

houqp commented 2 years ago

Thanks @alamb for adding the diagrams, really helps to visualize the idea :)

We might even be able to do it without any changes to the TableProvider as of now.

Question is without the table providers telling the planner that Scan B and `Scan C belongs to the same external database instance, how would it be able to decide whether these two scans can be combined into a single push down query? Imagine a case where Scan B is for a postgres database instance managed by AWS RDS, while scan C is for a postgres database instance managed in GCP, the planner need to be able to know they are from two different database instances so it can decide whether the join should happen within datafusion or gets pushed down.

On top of that, we also need a way to let the planner know what compute plan nodes are supported by a particular database type. I think the table provider could be a good abstraction to provide this info.

The specifics of what types of subplans could be converted is probably specific to the database being pushed to, so it isn't clear that logic belongs in the main datafusion crate.

I feel like this should be defined inside the table provider implementation, which should be maintained as plugins outside of datafusion core. Datafusion core should just maintain the current memory and listing table providers. Or maybe those two can be moved out of the core one day :)

alamb commented 2 years ago

On top of that, we also need a way to let the planner know what compute plan nodes are supported by a particular database type. I think the table provider could be a good abstraction to provide this info.

That is a good point.

I guess I was saying that trying to encode the wonderful complexity of many different external databases into TableProvider seems like it will be challenging.

I was wondering if perhaps mapping information like "table B" --> "AWS posgres" and "table C" --> "GCP posgres" could also be maintained outside of the TableProvider

My concern with making additional changes to TableProvider is that it is fairly complicated already, so trying to encapsulate complexity elsewhere in the system might be a good idea.

xudong963 commented 2 years ago
       ┌──────────────────┐                        
       │      Limit       │                        
       └──────────────────┘                        
                 ▲                                 
                 │                                 
       ┌──────────────────┐                        
       │    Aggregate     │                        
       └──────────────────┘                        
                 ▲                                 
                 │                                 
       ┌──────────────────┐                        
       │       Join       │                        
       └──────────────────┘                        
                 ▲                                 
      ┌──────────┴───────────┐                     
      │                      │                     

┌──────────────────┐ ┌──────────────────┐
│ Scan A │ │ Join │
└──────────────────┘ └──────────────────┘

┌──────────┴───────────┐
│ │
┌──────────────────┐ ┌──────────────────┐ │ Scan B │ │ Scan C │ └──────────────────┘ └──────────────────┘

Off-topic, what was the tool used to draw this picture? @alamb

alamb commented 2 years ago

@xudong963 I used: https://monodraw.helftone.com/

houqp commented 2 years ago

My concern with making additional changes to TableProvider is that it is fairly complicated already, so trying to encapsulate complexity elsewhere in the system might be a good idea.

I agree. Perhaps something along the lines of DatabaseProvider that can be referenced by a table provider.

backkem commented 11 months ago

I took a look at how Presto handles this. Presto uses the concept of a Connector to represent remote data sources. DF's TableProviderFactory is similar. Their TableHandle (Silimar to DF's TableReference) has a connectorID. This allows reasoning about which connector can handle parts of a query.

Aside from allowing table scans, the Connectorcan provide a logical and physical ConnectorPlanOptimizer. During plan optimization, Presto looks for the largest chunks of the plan that are handled by a single connector and passes them to the respective ConnectorPlanOptimizer. Some connectors take these plan chunks, turn them into a query for their data source, and provide back an opaque TableScanNode. Examples: JdbcComputePushdown or ClickHouseComputePushdown. This effectively results in query federation.

I expect it will likely be hard to find a good abstraction to express remote compute capabilities, due to their high verity. Presto's approach seems reasonable to allow granular federation without having to explicitly express federation capabilities. It also limits the number of new concepts that need to be added. DF can choose to ship basic remote providers (E.g. for ADBC) and others can be provided by 3rd parties.

Would this be a sensible approach to take?

alamb commented 11 months ago

Would this be a sensible approach to take?

@backkem The basic idea seems reasonable to me.

Given your description it seems like the same thing could be accomplished today in DataFusion by implementing and registering the appropriate Logical OptimizerRule or ExecutionPlan PhysicalOptimizerRule that would walk the plans, identify subtrees that they knew how to push down, and rewrite them to use specialized ExecutionPlans

DataFusion is already setup to run arbitrary ExecutionPlans / Sources / Passes. I wonder if there is anything specific we need to add 🤔 Perhaps an example and improve the docs would suffice

backkem commented 11 months ago

Ok, I can see how that would work. Thank you for the input.

I wonder if any of the following are worth upstreaming as a canonical way to handle this 'query federation' case:

alamb commented 11 months ago

A trait to determine if TableSources belong to the same remote execution engine. This could be part of TableSource or a separate optional trait next to it.

I wonder if TableSource::as_any would be sufficient

The FederationOptimizer that hands-off plan chunks to the respective remote execution Optimizers.

I am not sure how much benefit this would have over just a normal optimizer pass (I probably don't fully understand the proposal -- maybe some code would help explain it in more detail if you would like more specific feedback)

Some in-ecosystem federation plugins/examples such Flight SQL or ADBC.

An example that shows how to replace some part of the plan with a FlightSQL or ADBC implementation would be awesome.

backkem commented 9 months ago

Based on the discussion above and in #7871 and #8699, I wanted to further explore the idea of the TableProvider 'consuming' part of a plan. I wrote up the following sudo-code to help illustrate this:

// Optimizer rule that facilitates federation.
struct FederationOptimizerRule {}

impl OptimizerRule for FederationOptimizerRule {
    fn try_optimize(
        &self,
        plan: &LogicalPlan,
        config: &dyn OptimizerConfig,
    ) -> Result<Option<LogicalPlan>> {
        // Walk over the plan, look for the largest subtrees with only
        // FederatedTableSource's all using the same FederationProvider.
        // Pass each subtrees to its respective FederationProvider.optimizer
        // The job of this optimizer is to 'absorb' part of the plan and replace it with
        // one or more virtual TableScan's that wrap the piece of the plan it will federate.
    }
}

trait FederationProvider {
    fn optimizer(&self) -> Option<Arc<Optimizer>>;
    // Add Analyzer & PhysicalOptimizer if needed.
}

// FederatedTableSource that helps the FederationOptimizerRule find the 'largest subtrees'
// in a plan and run the corresponding FederationProvider.optimizer
trait FederatedTableSource: TableSource {
    fn federation_provider(&self) -> Arc<dyn FederationProvider>;

    fn table_type(&self) -> TableType {
        TableType::Temporary
    }
}

// TableProvider (or a new trait) is simplified since the virtual TableScan
// injected into the plan by the FederationProvider Optimizer already knows what to do.
trait TableProvider {
    async fn scan(&self, state: &SessionState) -> Result<Arc<dyn ExecutionPlan>>;
}

It should be feasible to create a wrapper between the old and the new TableProvider traits to port all existing implementations forward.

alamb commented 8 months ago

Based on the discussion above and in #7871 and #8699, I wanted to further explore the idea of the TableProvider 'consuming' part of a plan. I wrote up the following sudo-code to help illustrate this:

This code makes sense to me

It also seems like the same effect can be achieved by implementing an OptimizerRule directly to rewrite portions of the tree for specific TableProviders. So I wonder what benefit would this new API provide?

backkem commented 8 months ago

Technically, I agree it can already be done. However, having a canonical approach for this creates a focus point for creators of the FederatedTableSources to target. Making it easy to plug different sources into the engine, as the TableProvider enables today. It can also silence the individual 'xyz pushdown' discussions that seem to keep popping up. This latter point makes me lean towards addressing this in datafusion itself versus a separate project. More advanced providers such as a SQLite or MySQL one can live in contrib or their own crate.

backkem commented 8 months ago

Maybe one more thought in line with your comment: the API suggestion allows one to register the providers in one go, as is done today, without having to separately wire up the additional optimizers. I see it as a quality of life improvement.

alamb commented 8 months ago

Maybe one more thought in line with your comment: the API suggestion allows one to register the providers in one go, as is done today, without having to separately wire up the additional optimizers. I see it as a quality of life improvement.

I agree it would be better for people creating federated engines, but I think for people using the most common limit/filter/projection pushdown it might be more complicated.

What if we made an example like example/federated_query.rs that explained / showed how to do this as a way to lower the barrier to doing this 🤔

backkem commented 8 months ago

Yea, I agree there is a trade-off between ease-of-use and flexibility/capability. It's not obvious to me where to draw the line as mentioned in the sort pushdown discussion.

If anyone is interested in the DBMS-level federation, let me know. I'd be open to spinning up a project/crate to be the library equivaled of Trino-type connectors for DataFusion.

devinjdangelo commented 8 months ago

@backkem Thanks for the thoughts/example code above. I would be interested in contributing to / using such a sub-project to support Trino-style connectors.

backkem commented 8 months ago

I put together a POC based on the concept above (example). It's by no means complete but helps determine viability.

alamb commented 7 months ago

Update here is I think there is ongoing work in https://github.com/datafusion-contrib/datafusion-federation/ to support this usecase

alamb commented 5 months ago

I think https://github.com/datafusion-contrib/datafusion-federation/ is proceeding nicely -- let's continue the conversation there