apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.49k stars 1.02k forks source link

[Epic] A Collection of Sort Based Optimizations #10313

Open alamb opened 2 months ago

alamb commented 2 months ago

Usecase

Many analytic systems store their data with some particular sort order, and the query engine can often take advantage of this sort order to both reduce memory usage and performance

Specific examples in Datafusion include:

  1. Emitting from GroupBy early with partially sorted stream
  2. SortMergeJoin
  3. Sort removal via EnforceSorting and replace_with_order_preserving_variants

This information is currently encoded in ExecutionPlan::maintains_input_order ExecutionPlan::required_input_ordering and PlanProperties

The same underlying analysis is often required for streaming (where determining what to emit is modeled as a sorted stream, for example on date_trunc(ts) of a stream sorted by timestamp).

Describe the solution you'd like

This epic has a list of optimizations / improvements that further take sortedness into account. Here are some related issues:

phillipleblanc commented 2 months ago

Would this ticket be an appropriate place to add tickets related to pushing down sorts to federated query engines? I know that this was discussed previously (i.e. #7871) and it seems that writing a custom optimizer is the current way to handle that.

I will need to do this soon (federated sort pushdown) and it initially wasn't clear to me how to make this work in DataFusion. I can volunteer to write some docs on how to do this once I have an implementation that works.

alamb commented 2 months ago

Update here: we merged https://github.com/apache/datafusion/pull/9593 and now will work on increasing the test coverage to enable it by default (tracked in https://github.com/apache/datafusion/issues/10336)

alamb commented 2 months ago

Would this ticket be an appropriate place to add tickets related to pushing down sorts to federated query engines? I know that this was discussed previously (i.e. #7871) and it seems that writing a custom optimizer is the current way to handle that.

I added #7871 to the list above -- thank you.

Yes I think this would be a good place to discuss

I will need to do this soon (federated sort pushdown) and it initially wasn't clear to me how to make this work in DataFusion. I can volunteer to write some docs on how to do this once I have an implementation that works.

That would be great, thanks @phillipleblanc

Right now, once TableProvider::execute gets called, the returned ExecutionPlan can report how it is already sorted.

What we don't have is any way to have the optimizer tell a ExecutionPlan that it could reduce the work required in the DataFusion plan if the data was already sorted.

I wonder if we could add something to ExecutionPlan trait similar to ExecutionPlan::repartitioned like

trait ExecutionPlan {
...
  /// return other possible orders that this ExecutionPlan could return
  /// (the DataFusion optimizer will use this information to potentially push Sorts 
  /// into the Node
  fn pushable_sorts(&self) -> Result<Option<PotentialSortOrders>>> {
    return Ok(None)
  }

  /// return a node like this one except that it its output is sorted according to exprs
 fn resorted(&self) -> Result<Option<Arc<dyn ExecutionPlan>>> {
  return Ok(None)
 }

And then add a new optimizer pass that tries to push sorts into the plan nodes that report they can provide sorted data 🤔

phillipleblanc commented 1 month ago

After digging into and understanding how the datafusion-federation crate works, I don't think we need anything additional for sort pushdown. I basically came to the same realization that @backkem had in https://github.com/apache/datafusion/issues/7871#issuecomment-1833540670.

My realization essentially comes down to (please correct me if this is incorrect):

DataFusion is a library that provides both query planning (LogicalPlan) and query execution (ExecutionPlan). When we are connecting a set of tables from a remote query engine into DataFusion, what we really want is the ability to get an optimized logical plan and send that plan to be executed by the remote query engine - in its entirety, bypassing the query execution of DataFusion as much as possible. (In reality we still want the query execution DataFusion provides for more complex queries that involve custom UDFs, joins between two different remote query engines, etc).

The TableProvider construct is part of the query execution (ExecutionPlan level) machinery of DataFusion, so trying to teach it to be smarter for the query federation case is an anti-pattern in my mind. But we still need a TableProvider to be registered so we can take advantage of the logical planning (via the auto-transformation of a TableProvider to a TableSource in said planning). The datafusion-federation repo solves this by using a thin wrapper around a TableProvider called a FederatedTableProviderAdaptor whose entire job is to provide a TableSource during logical planning. And through a custom FederationQueryPlanner - it recognizes when there are TableScans of a FederatedTableProviderAdaptor and knows to delegate the query execution for the largest LogicalPlan sub-tree that includes only TableScans from the same source to that source (via the deparsing back to SQL).

alamb commented 1 month ago

FYI @NGA-TRAN is working on porting ProgressiveEval to DataFusion: https://github.com/apache/datafusion/issues/10488