apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.37k stars 1.2k forks source link

Optimize "per partition" top-k : `ROW_NUMBER < 5` / TopK #6899

Open alamb opened 1 year ago

alamb commented 1 year ago

Is your feature request related to a problem or challenge?

DataFusion optimizes queries like ... ORDER BY value LIMIT 10 by only keeping the top 10 ("limit") rows when sorting which is great!

Another common pattern (that we also have in IOx) (https://github.com/influxdata/influxdb_iox/pull/8187/files#r1257834347) is queries like the following to select the top N values "per partition"

SELECT ...
  ROW_NUMBER() OVER (PARTITION BY value1, ORDER BY value2) as rn
WHERE
  rn < 10

Currently the plan will be something like:

Filter(rn < 10)
  WindowExec(ROW_NUMBER...)
    Sort(value1, value2)

The problem with this plan is that it will sort (and copy) the ENTIRE input even when the query only needs the first 10 rows of each partition

Describe the solution you'd like

It would be awesome to optimize this case somehow so that it did not need to sort the entire input (and somehow could only keep the top N values per partition). I am not sure how easy this would be to do for sorting

Describe alternatives you've considered

Maybe we could at least teach the window operator to only emit the top N values per partition if there was a row number predicate at at least save some of that work -- the sort would still be required, but at least the window operator would do less work

Additional context

No response

ozankabak commented 1 year ago

I think we need an optimization step that transforms the plan you gave to one that uses a fetching sort and does away with the filter. It seems to me the window operator would still be used as is.

alamb commented 1 year ago

I think we need an optimization step that transforms the plan you gave to one that uses a fetching sort and does away with the filter. It seems to me the window operator would still be used as is.

I agree the window operator probably should remain as is

Maybe we could use a specialized sort operator like

Filter(rn < 10)
  WindowExec(ROW_NUMBER...)
    PartitionedSort(order_by={value1, value2}, prefix={value1}, fetch = 10)

Where the PartitionedSort semantics are to only output the top 10 values for some prefix of the sort key (in this case, each distinct value of value1)

🤔

comphead commented 1 year ago

Spark does the similar way: it sorts and limits data per partition then sends the output to single partition where final sort/limit performed. Spark has the logic encapsulated in separate operator and looks like https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L311

it also contains optimization like if tuples already ordered it will skip the excessive ordering, the same happens for projection

yyy1000 commented 9 months ago

I'd like to help this. (Looks not so difficult I think) Currently, my thoughts are:

  1. Implement a PartitionedSort PhysicalPlan, which could sort in each partition, and merge them after each partition is sorted.
  2. When there's a PARTITION BY, I think it can be known by input.output_partitioning().partition_count(), match the LogicalPlan::Sort to PartitionedSort in https://github.com/apache/arrow-datafusion/blob/a6ef1bec480872f15f83628a7fb8c9bb2722cd49/datafusion/core/src/physical_planner.rs#L938-L950

A question is whether the fetch in LogicalPlan is what we need in PartitionedSort, (seems not), I could try it.

alamb commented 9 months ago

This is one where I would recommend you try hacking up a prototype that works enough to show some performance results, and then get feedback on it before spending too much time polishing. I think this one could easily turn into a large project

yyy1000 commented 9 months ago

@alamb Thanks for your reply! Also I'd like to do it after I got enough knowledge. 😎