datafusion-contrib / datafusion-federation

Allow DataFusion to resolve queries across remote query engines while pushing down as much compute as possible down.
Apache License 2.0
77 stars 18 forks source link

Support sort pushdowns (i.e. efficient TopK queries) #32

Closed phillipleblanc closed 6 months ago

phillipleblanc commented 6 months ago

This is a follow-on issue from https://github.com/apache/datafusion/issues/7871 and https://github.com/apache/datafusion/issues/10313#issuecomment-2089129451

My use-case is that I want to be able to efficiently support TopK queries (i.e. SELECT * FROM very_large_table ORDER BY foo LIMIT 10). Currently this requires doing a full table scan of the source and then sorting the entire dataset in DataFusion. Being able to pushdown the ORDER BY foo LIMIT 10 clause to the remote engine would make this query way more efficient (it's currently unusable for larger datasets in our use-case).

It seems that there are roughly two ways to approach pushing down a sort to a federated query engine in DataFusion. The first way, which can be done today, is to add a custom optimizer pass (OptimizerRule and PhysicalOptimizerRule) that propagates the sort/limit information down into the ExecutionPlan. The second way as pointed out in 10313, is to change the ExecutionPlan trait to report that it can resort to match what DataFusion needs (via something like a pushable_sorts) and then return a new ExecutionPlan sorted in the correct way (via resorted).

I thought it would be useful to discuss/implement the options in this repo first to get a sense for how they would work - and we can take a proposal back to the main repo if we want to make the trait change.

My instinct is that the custom optimizer pass is more approachable in the short-term, but building it natively into DataFusion is the correct long-term approach.

I'm not planning to work on this immediately, but I suspect it will become something I work on within the next few months and wanted to start the conversation early.

backkem commented 6 months ago

Hi @phillipleblanc, I had the same use-case. It's part of the motivation for starting this repo. The federation repo can already do what you described in simple cases. For example, if the entire query can be sent to one remote, things work quite well. In more complex queries, cutting up the plan becomes more complex, resulting in sub-optimal execution (and there are also a handful of known not working scenario's such as subqueries in expressions). There is also room for improving joins across remotes in the TopK scenario.

I intend to slowly keep chipping away at the scenario's as I find the time.

phillipleblanc commented 6 months ago

Oh nice, I for some reason thought this repo was focused on pushing down joins between two sources. I'll take another look at the sort optimization.

backkem commented 6 months ago

The repo is about re-serialising as-large-as-possible subplans that can be executed by one remote query engine, into a query format that the respective engine supports (E.g.: back to SQL, Substrait, etc), and executing the queries remotely, integrating the results back into the overall DataFusion query execution.

backkem commented 6 months ago

That being said, if you'd prefer to experiment with more complex pushdown options in the TableProvider trait, I'm happy to accommodate. I just tried a more generic attempt first as it also can pushdown entire joins etc to one remote.

phillipleblanc commented 6 months ago

I finally got some time to dig into how this works - it's very clever! I actually agree now that this approach of re-serialising as-large-as-possible subplans is a better approach than trying to teach a TableProvider to be more clever in accepting more pushdown options.