apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.3k stars 1.19k forks source link

Row groups are read out of order or with completely different values #10572

Open twitu opened 6 months ago

twitu commented 6 months ago

Describe the bug

Datafusion is reading row groups out of order and sometimes with completely different values for the row groups. The data is verified by reading the same files using the Python pyarrow.parquet library.

The pyarrrow and datafusion reader read the same values when the file has 126 row groups. But give complete different values when the file has 127 row groups.

To Reproduce

https://github.com/nautechsystems/nautilus_experiments/tree/datafusion-bug

The steps, data and results are documented in this repo and branch. The README is shared here again.


Use the python script to extract row group information from the parquet files using pyarrow.

pip install -r requirements.txt
python extract_ts_init.py 126-groups.parquet 126-groups-python.csv
python extract_ts_init.py 127-groups.parquet 127-groups-python.csv

Run the rust executable to extract row group information from the parquet files using datafusion.

cargo run 126-groups.parquet > 126-groups-rust.csv
cargo run 127-groups.parquet > 127-groups-rust.csv

Ideally there should be no difference between the csv files for the row groups. However, 126 works properly. But 127 gives different results for Python and Rust.

This shows that indeed there's no difference with 126 groups.

diff 126-groups-rust.csv 126-groups-python.csv # no diff
diff 126-groups-rust.csv 126-groups-python.csv # big diff, things crazy

We can also make sure that these are in fact from the same data source with just one extra row group with this command which shows 127 groups python has only one extra entry at the end.

diff 126-groups-python.csv 127-groups-python.csv

Expected behavior

Datafusion reader should

Additional context

No response

alamb commented 6 months ago

Thank you for the report and the reproducer ❤️

read row groups in order they were written

This is not my expectation.

DataFusion reads row groups in parallel, potentially out of order, with multiple threads as an optimization. To preserve the order of the data you can either set the configuration datafusion.optimizer.repartition_file_scans to false or else communicate the order of the data in the files using the CREATE EXTERNAL TABLE .. WITH ORDER clause and then explicitly ask for that order in your query.

read the same values for the same row group even when the file increases in size read the same values as the python pyarrow parquet reader

Yes I agree these are also my expectation

Maybe you can try setting datafusion.optimizer.repartition_file_scans to false and see if that makes the data consistent

twitu commented 6 months ago

Setting datafusion.optimizer.repartition_file_scans to false like this fixes things. :heavy_check_mark:

    let session_cfg =
        SessionConfig::new().set_str("datafusion.optimizer.repartition_file_scans", "false");
    let session_ctx = SessionContext::new_with_config(session_cfg);

However, it's unclear how it interacts with other options and affects memory and performance. So here's what I have -

It is a given that the data will be sorted based on timestamp like this

    let parquet_options = ParquetReadOptions::<'_> {
        skip_metadata: Some(false),
        file_sort_order: vec![vec![Expr::Sort(Sort {
            expr: Box::new(col("ts_init")),
            asc: true,
            nulls_first: true,
        })]],
        ..Default::default()
    };

Then there are two approaches to get row groups/data in order -

It's not clear to me how each option affects the performance and memory usage. Do you have any guidance around it?

alamb commented 6 months ago

Setting datafusion.optimizer.repartition_file_scans to false like this fixes things. ✔️

That is great news

Using an order by clause in the query session_ctx.sql("SELECT * FROM data ORDER BY ts_init"). From our https://github.com/apache/datafusion/discussions/7675#discussioncomment-7146896, doing an order by on an already sorted column does not incur an additional overhead.

This is the approach I would recommend (we do it in InfluxDB 3.0).

You can verify that there are no additional sorts, etc in the plan by using EXPLAIN to look at the query plan

If you use this approach, you should also be able to avoid setting datafusion.optimizer.repartition_file_scans to false as the optimizer will take care of it automatically

twitu commented 6 months ago

Thanks for sharing and helping resolve this issue so quickly :grin:

twitu commented 5 months ago

So I tried out some experiments with different combinations of using ORDER BY and setting the repartition_file_scans configuration value. And the results are a bit un-intuitive. The full results, instructions and data is hosted in this repo ^1.

The script reads a file and prints the number of rows in the first row group. Ideally it should only load the first row group.

However, As you can see, even after specifying the sort order of the file the ORDER BY query still loads the whole file and does some kind of sort operation. You can see that for a 100 MB file it loads up to 900 MB. Turning on re-partitioning helps a bit in improving perf and memory footprint.

    let parquet_options = ParquetReadOptions::<'_> {
        skip_metadata: Some(false),
        file_sort_order: vec![vec![Expr::Sort(Sort {
            expr: Box::new(col("ts_init")),
            asc: true,
            nulls_first: true,
        })]],
        ..Default::default()
    };

But the best result is by removing sorting and turning of re-partitioning. I believe it only loads the one row group required. It'll be very helpful to document this interaction.

order repartition wall time (s) memory (mb) read sorted order
true true 0.84 654
true false 1.19 944
false false 0.21 45
false true 0.33 151

I'll share more results for reading the whole file.

twitu commented 5 months ago

Here's the result for the script that reads the whole file and counts the total number of rows also included in the repo. ^1

order repartition wall time (s) memory (mb) read sorted order
true true 1.24 654
true false 1.65 944
false false 0.55 45
false true 0.41 151

It seems like order=false and repartition=false seems to be the holy grail of performant, low-memory footprint streaming in sorted order.

What do you think?

alamb commented 5 months ago

Hi @twitu -- thanks for this

Some comments:

I took a quick peek at https://github.com/nautechsystems/nautilus_experiments/tree/efficient-query and: it looks like it reads only a single batch out https://github.com/nautechsystems/nautilus_experiments/blob/a4ceb950de3b4bbc43ec82b64ee1495d077f5116/src/bin/single_row_group.rs#L45

This means you are running the equivalent of SELECT ... LIMIT 4000 or something similar

It seems like order=false and repartition=false seems to be the holy grail of performant, low-memory footprint streaming in sorted order.

I would expect those settings will be the lowest latency (time to first batch)

However, As you can see, even after specifying the sort order of the file the ORDER BY query still loads the whole file and does some kind of sort operation.

I would expect that it loads the first row group of each file and begins merging them together (e.g. if you did an EXPLAIN ... on your SQL you would see a SortPreservingMerge in the physical plan

@suremarc and @matthewmturner and others have been working on optimizing a similar case -- see https://github.com/apache/datafusion/issues/7490 for example. We have other items tracked https://github.com/apache/datafusion/issues/10313 (notably we have a way to avoid opening all the files if we know the data is already sorted and doesn't overlap: https://github.com/apache/datafusion/issues/10316)

cc @NGA-TRAN

twitu commented 5 months ago

I also added a query explanation and here're the results.

    plan: Analyze
      Sort: data.ts_init ASC NULLS LAST
        Projection: data.bid, data.ask, data.bid_size, data.ask_size, data.ts_event, data.ts_init
          TableScan: data,
    plan: Analyze
      Projection: data.bid, data.ask, data.bid_size, data.ask_size, data.ts_event, data.ts_init
        TableScan: data,

You'll see that the queries with ORDER BY have a Sort expression in the plan. It's not clear to me why despite specifying the sort order in the configuration the plan still has a sort. I hope the optimizations you've mentioned will take this into account.

        file_sort_order: vec![vec![Expr::Sort(Sort {
            expr: Box::new(col("ts_init")),
            asc: true,
            nulls_first: true,
        })]],

You'll see in the experiments repo that I've implement binaries for both reading a single row group and reading the whole file. The queries are the same the behaviour is changed in how the resulting stream is consumed. I don't think this is equivalent to adding a LIMITclause because for the purpose of the query I'm reading the whole file. It is only that the consumer decides to stop after reading one row group.

It seems like order=false and repartition=false seems to be the holy grail of performant, low-memory footprint streaming in sorted order.

I would expect those settings will be the lowest latency (time to first batch)

Surprisingly, these settings also give lowest latency and memory foot print when reading the full file as shown in the above table^1.

If you need an additional contributor in any of the above mentioned issues, I'm happy to help :smile:

alamb commented 5 months ago

Hi @twitu -- I am very sorry for the delay in responding -- I have been traveling for sever

You'll see that the queries with ORDER BY have a Sort expression in the plan. It's not clear to me why despite specifying the sort order in the configuration the plan still has a sort. I hope the optimizations you've mentioned will take this into account.

One thing that might be going on is that the NULLS FIRST doesn't seem to match

In your plan the sort is putting nulls last

      Sort: data.ts_init ASC NULLS LAST

but in your code you specify NULLS first

        file_sort_order: vec![vec![Expr::Sort(Sort {
            expr: Box::new(col("ts_init")),
            asc: true,
            nulls_first: true,
        })]],

I don't think this is equivalent to adding a LIMIT clause because for the purpose of the query I'm reading the whole file. It is only that the consumer decides to stop after reading one row group.

DataFusion is a streaming engine, so if you open a parquet file and read one batch and stop then the entire file will not be opened read (the batches are basically created on demand)

There are certain "pipeline breaking" operators that do require reading the entire input, such as Sort and GroupHashAggregate which is why I think you are seeing the entire file read when your query has a sprt

If you need an additional contributor in any of the above mentioned issues, I'm happy to help 😄

We are always looking for contributors -- anything you can do to help others would be most appreciated. For example, perhaps you can add an example to datafusion-examples https://github.com/apache/datafusion/tree/main/datafusion-examples showing how to use a pre-sorted input file to avoid sorting during query (assuming that you can actually get that working)

twitu commented 1 month ago

Sorted row groups read OUT-OF-ORDER when FILTER clause is used

I'm seeing another issue with datafusion streaming sorted records from disk with a filter query in the SQL. The code and instructions to reproduce the issue are here^1.To briefly recount the context.

I have a parquet file is sorted on the ts_init column. We want to stream data from the file in ascending order of ts_init. However, we do not want to sort the data in-memory since it is already sorted.

To achieve this I use the following datafusion configuration.

    let session_cfg =
        SessionConfig::new().set_str("datafusion.optimizer.repartition_file_scans", "false");
    let session_ctx = SessionContext::new_with_config(session_cfg);
    let parquet_options = ParquetReadOptions::<'_> {
        skip_metadata: Some(false),
        file_sort_order: vec![vec![datafusion_expr::SortExpr {
            expr: col("ts_init"),
            asc: true,
            nulls_first: false,
        }]],
        ..Default::default()
    };

This works well when there is no filter clause in the query. The below commands will pass. SELECT * FROM data

cargo run 127-groups.parquet > 127-groups-rust.csv
python check_invariant.py 127-groups-rust.csv

However, when there is a filter clause in the query. The row groups are not read in-order causing the ascending order check to FAIL. The row groups are read out of order. SELECT * FROM data where ts_init >= 1701388832486000000 AND ts_init <= 1701392194001000000

cargo run 127-groups.parquet filter > 127-groups-rust.csv
python check_invariant.py 127-groups-rust.csv #WILL FAIL
alamb commented 1 month ago

Don't use SORT BY clause in query

I would expect to get this to work correctly, you need to use an ORDER BY in your query

If you don't include an ORDER BY the datafusion optimizer will not attempt to maintain any ordering (with a filter it likely repartitions the input stream to parallelize the filster)

With an ORDER BY the optimizer will remove the actual SortExec when appropriate (and thus will not do the sort)

You can see that it does so by running EXPLAIN ... on your querty

You may also have to set datafusion.optimizer.prefer_existing_sort instead of datafusion.optimizer.repartition_file_scans

twitu commented 1 month ago

I added the datafusion.optimizer.prefer_existing_sort = true and datafusion.optimizer.repartition_file_scans = false and compared with previous results. All of the below configurations read the full file in 0.08 s and 60 mb of memory. However, without the ORDER BY clause filtering becomes out of order.

order filter read sorted order
true true 0.08 60
true false 0.8 60
false false 0.55 60
false true 0.41 60

It seems like the best combination for efficient, low-memory streaming, queries while supporting filtering is achievable by

However, a point to note is that there does not seem to be a verify if prefer_existing_sort prevents an additional sort. I printed the optimized plan with the above settings.

The optimized plan for a query with ORDER BY clause has a sort.

Sort(
    Sort {
        expr: ,
        input: Filter(
            Filter {
                predicate: BinaryExpr,
                input: TableScan,
                having: false,
            },
        ),

The optimized plan for a query WITHOUT ORDER BY clause did NOT have a sort.

Filter(
    Filter {
        predicate: BinaryExpr,
        input: TableScan,
        having: false,
    },
)

Shouldn't the optimized plan for both be the same, since I have set prefer_existing_sort to true?

twitu commented 1 month ago

@alamb Is there anyway to verify that setting datafusion.optimizer.prefer_existing_sort = true eliminates the sort during actual execution?

As I've shared above, the optimized query plan does not reflect this. It still has the Sort expression. This makes it hard to verify the behaviour of the query without actually running it.

alamb commented 1 month ago

@alamb Is there anyway to verify that setting datafusion.optimizer.prefer_existing_sort = true eliminates the sort during actual execution?

The only way I know is to use the Explain plan.

The physical plan reflects the actual plan that will be run -- so if it has a SortExec then there will be some sort of sort during execuction

Many of the operators have specializations, however, like TopK for sort with limit, so interpreting exactly what will happen from a plan is somewhat of an art form.

Here is some documentation about how to read explain plans (thanks @devanbenz !): https://datafusion.apache.org/user-guide/explain-usage.html