apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.25k stars 1.18k forks source link

Optimized version of `SortPreservingMerge` that doesn't actually compare sort keys of the key ranges are ordered #10316

Open alamb opened 6 months ago

alamb commented 6 months ago

Is your feature request related to a problem or challenge?

When merging a large number of pre-sorted streams (e.g. in our case, a large number of pre-sorted parquet files) the actual work in SortPreservingMerge to keep them sorted is often substantial (as the sort key of each row in each stream must be compared the other potential candidates)

Here is the sort preserving merge https://github.com/apache/datafusion/blob/4edbdd7d09d97f361748c086afbd7b3dda972f76/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs#L39-L67

However, in some cases (such as @suremarc has identified in https://github.com/apache/datafusion/issues/6672) we can use information about how the values of the sort key columns are distributed to avoid needing a sort

For example, if we have three files that are each sorted by time and have the following ranges

We can produce the output sorted stream by first reading file1.parquet entirely then file2.parquet, then file3.parquet

Not only will this be faster than using SortPreservingMerge it will require less intermediate memory as we don't need to read a batch from each input stream to begin producing output. For cases where there may be 100s of files, this can minimize the amount of concurrently outstanding requests substantially

Also, for a query that will not read the entire dataset (e.g. only wants the most recent values) it can be especially beneficial:

SELECT * FROM data ORDER BY time limit 10

In this case our example above would only ever read file1.parquet (wouldn't even open the others) if it had more than 10 rows

Describe the solution you'd like

I would like an operator that does not actually merge if not needed

Describe alternatives you've considered

@NGA-TRAN implemented the following operator in InfluxDB IOx ProgressiveEval which we have found works pretty well and has offered to contribute it back upstream

We wrote about using this operator here: https://www.influxdata.com/blog/making-recent-value-queries-hundreds-times-faster/

/// ProgressiveEval return a stream of record batches in the order of its inputs.
/// It will stop when the number of output rows reach the given limit.
///
/// This takes an input execution plan and a number n, and provided each partition of
/// the input plan is in an expected order, this operator will return top record batches that covers the top n rows
/// in the order of the input plan.
///
/// ```text
/// ┌─────────────────────────┐
/// │ ┌───┬───┬───┬───┐       │
/// │ │ A │ B │ C │ D │       │──┐
/// │ └───┴───┴───┴───┘       │  │
/// └─────────────────────────┘  │  ┌───────────────────┐    ┌───────────────────────────────┐
///   Stream 1                   │  │                   │    │ ┌───┬───╦═══╦───┬───╦═══╗     │
///                              ├─▶│  ProgressiveEval  │───▶│ │ A │ B ║ C ║ D │ M ║ N ║ ... │
///                              │  │                   │    │ └───┴─▲─╩═══╩───┴───╩═══╝     │
/// ┌─────────────────────────┐  │  └───────────────────┘    └─┬─────┴───────────────────────┘
/// │ ╔═══╦═══╗               │  │
/// │ ║ M ║ N ║               │──┘                             │
/// │ ╚═══╩═══╝               │                Output only include top record batches that cover top N rows
/// └─────────────────────────┘                
///   Stream 2
///
///
///  Input Streams                                             Output stream
///  (in some order)                                           (in same order)
/// ```

Additional context

The original inspiration for this operator came from @pauldix (who I think mentioned it was inspired by ElasticSearch)

alamb commented 6 months ago

Here is the entire implementation in case anyone wants this: https://github.com/influxdata/influxdb3_core/blob/b546e7f86ee9adbff0dd3c5e687140848397604a/iox_query/src/provider/progressive_eval.rs

pauldix commented 6 months ago

Oh yeah, we're going to be hitting this all the time. Definitely want to see this! 😁

alamb commented 6 months ago

I probably should have mentioned that in order to use this operator, one needs to be able to determine if the input streams to SortPreservingMerge are non overlapping in value. Conveniently, that is basically the analysis that @suremarc has implemented in https://github.com/apache/datafusion/pull/9593

alamb commented 6 months ago

Oh yeah, we're going to be hitting this all the time. Definitely want to see this! 😁

Also to be clear to everyone else -- we have this code in InfluxDB already but it would be great to have other people be able to use it (and help maintain it)

alamb commented 6 months ago

FYI @ozankabak @mustafasrepo @matthewmturner

NGA-TRAN commented 6 months ago

Thanks @alamb. I am happy to port ProgressiveEval from InfluxDB to DB

alamb commented 6 months ago

I think we should both port ProgressiveEval as well as hook it up in the optimizer so it is is used (likely based on the analysis in https://github.com/apache/datafusion/pull/9593)

NGA-TRAN commented 6 months ago

If the hooking work is not that tricky (which I think the case), I am happy to do that, too

suremarc commented 6 months ago

@alamb Just to be absolutely clear, if the plan consists entirely of Parquet files from a single table, then the SortPreservingMerge will be eliminated. As you can see in this sqllogictest, a single file group is produced if none of the files overlap, and DataFusion correctly understands a SortPreservingMergeExec is not required. In particular, after #9593, #6672 should be fixed.

That said, it's worth pointing out that ProgressiveEval is more general (it can work for arbitrary plans e.g. heterogeneous unions).

berkaysynnada commented 6 months ago

What comes to my mind is that if we can successfully bring the statistics to SortPreservingMerge, it could handle this work without a separate operator (This will probably need a change in statistics API to return statistics for each partition).

alamb commented 6 months ago

What comes to my mind is that if we can successfully bring the statistics to SortPreservingMerge, it could handle this work without a separate operator (This will probably need a change in statistics API to return statistics for each partition).

I think this approach would work as well -- or maybe just a flag on the SortPreservingMerge and a different stream rather than an entirely ExecutionPlan 🤔

Per partition statistics is an interesting idea 🤔 -- it certainly makes sense for things like data sources. I wonder how generally useful it could be

NGA-TRAN commented 5 months ago

@alamb : I am starting porting ProgresssiveEval from IOx. Do you want me to use this ticket or https://github.com/apache/datafusion/issues/10433 for that work or you want me to open a new ticket for the porting?

alamb commented 5 months ago

@alamb : I am starting porting ProgresssiveEval from IOx. Do you want me to use this ticket or #10433 for that work or you want me to open a new ticket for the porting?

How about we open a new ticketfor porting and link it to this epic?

NGA-TRAN commented 5 months ago

This is the ticket to port ProgressiveEval from InfluxDB https://github.com/apache/datafusion/issues/10488

alamb commented 5 months ago

Update here:

  1. @NGA-TRAN has a PR open to port this operator into DataFusion: https://github.com/apache/datafusion/pull/10490
  2. We don't want to merge it into DataFusion until at least one other user could get value from it.

So in my mind, what is required to move on with this ticket is:

  1. Update one of the existing optimizer passes to detect when the sort keys of a SortPreservingMerge don't overlap in value and replace with ProgressiveEval
  2. Tests showing it working

For the "do key ranges overlap" detection code I think we can use what @suremarc added in https://github.com/apache/datafusion/pull/9593

wj-stack commented 4 weeks ago

This is the ticket to port ProgressiveEval from InfluxDB #10488

hello,Is there still a chance for this operator to be added to DataFusion? If it doesn't get merged, are there any other ways to use it? thanks @NGA-TRAN

alamb commented 4 weeks ago

This is the ticket to port ProgressiveEval from InfluxDB #10488

hello,Is there still a chance for this operator to be added to DataFusion? If it doesn't get merged, are there any other ways to use it? thanks @NGA-TRAN

I think the status is still as in https://github.com/apache/datafusion/issues/10316#issuecomment-2113301756

I believe @suremarc said he may have some ideas of how to add the "do key ranges overlap" code / tests.

@wj-stack perhaps you would be willing to write some tests (that should be able to use SortPreservingMerge) to help the project along?

You should be able to make a sqllogictest -- see docs here https://github.com/apache/datafusion/tree/main/datafusion/sqllogictest#readme

Here is an example of creating an existing table with order: https://github.com/apache/datafusion/blob/e00af2c199c0669faa25f41de57db4568e1b92f9/datafusion/sqllogictest/test_files/create_external_table.slt#L236

Here are examples of creating parquet files as part of tests: https://github.com/apache/datafusion/blob/e00af2c199c0669faa25f41de57db4568e1b92f9/datafusion/sqllogictest/test_files/copy.slt#L24

suremarc commented 1 week ago

Hey @alamb, I am interested in using ProgressiveEval (or having the functionality merged into SortPreservingMergeStream) as my use case is optimizing unions where the children have non-overlapping ranges. For instance:

SELECT * FROM t1 
WHERE "timestamp" < cutoff
UNION ALL
SELECT * FROM t2
WHERE "timestamp" >= cutoff
ORDER BY "timestamp"

In principle it should be possible to read results from t1 first.

I do think we can reuse the analysis in #9593, but ideally we would have statistics per partition as @ozankabak mentioned. This would allow us to implement the operator generically, without really having to inspect its children.