apache / arrow-rs

Official Rust implementation of Apache Arrow
https://arrow.apache.org/
Apache License 2.0
2.5k stars 745 forks source link

`parquet::column::reader::GenericColumnReader::skip_records` still decompresses most data #6454

Open samuelcolvin opened 2 days ago

samuelcolvin commented 2 days ago

Describe the bug

I noticed this while investigating https://github.com/apache/datafusion/issues/7845#issuecomment-2370455772.

The suggestion from @jayzhan211 and @alamb was that datafusion.execution.parquet.pushdown_filters true should improve performance of queries like this, but it seems to make them slower.

I think the reason is that data is being decompressed twice (or data is being decompressed that shouldn't be), here's a screenshot from samply running on this code:

image

(You can view this flamegraph properly here)

You can see that there are two blocks of decompression work, the second one is associated with parquet::column::reader::GenericColumnReader::skip_records and happens after the first decompression chunk and running the query has completed.

In particular you can se that there's a read_new_page() cal in parquet::column::reader::GenericColumnReader::skip_records (line 335) that's taking a lot of time:

image

My question is - could this second run of compression be avoided?

To Reproduce

Clone https://github.com/samuelcolvin/batson-perf, comment out one of the modes, compile with profiling enabled cargo build --profile profiling, run with samply samply record ./target/profiling/batson-perf

Expected behavior

I would expect that datafusion.execution.parquet.pushdown_filters true was faster, I think the reason it's not is decompressing the data twice.

Additional context

https://github.com/apache/datafusion/issues/7845#issuecomment-2370455772

tustvold commented 2 days ago

Have you enabled the page index?

etseidl commented 2 days ago

Have you enabled the page index?

Indeed. Or enabled v2 page headers? The issue seems to be that when skipping rows (skip_records defines a record as rep_level == 0, so a row), the number of rows per page is not known in advance, so to figure out the number of levels to skip, the repetition levels need to be decoded for every page. For V1 pages, unfortunately, the level information is compressed along with the page data, so the entire page needs decompressing to calculate the number of rows. If either of V2 page headers or the page index were enabled, then the number of rows per page is known without having to do decompression, so entire pages can be skipped with very little effort (the continue at L330 above).

I don't think pages are uncompressed twice...it's just a result of the two paths through ParquetRecordBatchReader::next (call call skip_records until enough have been skipped, then switch over toread_records`).

alamb commented 2 days ago

I think the documentation on https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.RowFilter.html is also instructive. Even if all the decode is working properly, I think the arrow reader may well decode certain pages twice. It is one of my theories about why pushing filters down doesn't make things always faster, but I have not had time to look into it in more detail

tustvold commented 2 days ago

See also https://github.com/apache/arrow-rs/issues/5523

Although I suspect in this case the issue is a lack of page index information for whatever reason