apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.44k stars 3.52k forks source link

[Python] Performance degradation parquet reading of wide dataframes #27353

Open asfimport opened 3 years ago

asfimport commented 3 years ago

I noticed a relatively big performance degradation in version 1.0.0+ when trying to load wide dataframes.

For example you should be able to reproduce by doing:


import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

df = pd.DataFrame(np.random.rand(100, 10000))
table = pa.Table.from_pandas(df)
pq.write_table(table, "temp.parquet")

%timeit pd.read_parquet("temp.parquet")

In version 0.17.0, this takes about 300-400 ms and for anything above and including 1.0.0, this suddenly takes around 2 seconds.

 

Thanks for looking into this.

Reporter: Axel G

Related issues:

Note: This issue was originally created as ARROW-11469. Please see the migration documentation for further details.

asfimport commented 3 years ago

Joris Van den Bossche / @jorisvandenbossche: [~Axelg1] Thanks for the report

We have had similar issues in the past (eg ARROW-9924, ARROW-9827), but it seems that some things deteriorated again.

So as a temporary workaround, you can specify use_legacy_dataset=True to use the old code path (another alternative is using the single-file pq.ParquetFile interface, this will never have overhead for dealing with potentially more complicated datasets).

cc @bkietz There seems to be a lot of overhead being spent in the projection (RecordBatchProjector, and specifically SetInputSchema, CheckProjectable, FieldRef finding, see the attached profile profile_wide300.svg ), while in this case there is actually no projection happening.

asfimport commented 3 years ago

Elena Henderson / @ElenaHenderson: Performance of reading wide dataframes degraded further in pyarrow 4.0.0:

https://conbench.ursa.dev/compare/runs/afbbfc387b314fb6886168225c29d3af...c30d6d188bc24aa2b319808446b8d0aa/

https://conbench.ursa.dev/compare/runs/87781efdd9f940098f91cc8b50a32ca1...a54fde45cfa2406bab2ff0f5080f274d/

 

Baseline = pyarrow 3.0.0

Contender = pyarrow 4.0.0

 

image-2021-05-03-14-39-59-485.png

 

We tracked down regression to this commit: https://github.com/apache/arrow/commit/c5c583b5d4290563332c653a0084c666ef232f0c 

https://conbench.ursa.dev/compare/runs/d731c9100c6190566354a83dee2239ddb7be21d6...c5c583b5d4290563332c653a0084c666ef232f0c/

 

Baseline = https://github.com/apache/arrow/commit/d731c9100c6190566354a83dee2239ddb7be21d6

Contender = https://github.com/apache/arrow/commit/c5c583b5d4290563332c653a0084c666ef232f0c 

 

image-2021-05-03-14-40-09-520.png

 

cc @bkietz @nealrichardson @dianaclarke @westonpace

asfimport commented 3 years ago

David Li / @lidavidm: I took a quick stab and looked with a profiler.

TL;DR we're doing two things quadratically (copying shared_ptrs and looking up fields). This is the bulk of the runtime - not any actual computation!

I tested a Parquet file with 10000 columns, 10000 rows, using the read options use_threads=True, memory_map=True.

PyArrow 0.17, conda-forge: consistently ~550ms master 5.0.0: consistently ~5000-5500ms

Profiler: https://share.firefox.dev/3bffpI3

The time spent falls in 3 buckets:

  1. ScanTask::SafeExecute takes ~500ms. This is actually reading the data - everything seems fine here.
  2. SetProjection takes ~1000ms.

    SetProjection takes each column and looks it up in the schema with FieldRef::GetOne. The lookup guards against duplicate field names, hence, it's iterating every column on each lookup - a quadratic operation. This is ~500ms of the runtime. Then, when we bind the projection, the same thing happens again - this is the other half. Most of the leaf time is spent in std::operator==, i.e. in comparing column names.

  3. ProjectSingleBatch takes ~3300ms.

    Most of the time ends up in GetDatumField, which is yet again a lookup. Of the time herein, FieldRef::FindOneOrNone takes 730ms - this is the same problem as before (quadratic lookup). The method is sufficiently generic that we can't necessarily assume the field indices are the same (or can we?).

    The bulk of the remaining time - 2530ms - is in FieldPathGetDatumImpl which boils down to FieldPath::Get. This boils down to shared_ptr operations in SimpleRecordBatch::column_data (which is just a getter returning a vector<shared_ptr>) - likely, since we're doing this once per field per record batch, we're creating a copy of 10000 shared_ptrs on every iteration!

    I tried two optimizations - if we make a simple lookup table in SetProjection, we can shave off those 500ms. (This will be harder if/when we support nested field refs.) And if we add a SimpleRecordBatch::column_data_ref which returns an const ArrayDataVector&, then we can shave about 2500ms off the runtime. Profile after making these changes: https://share.firefox.dev/3tHsT5U

    The rest of the time is in Bind/FindAll. We could probably eliminate or lessen this. We could add index-based field refs; this would make things much less flexible though. Or we could add careful use of lookup tables, e.g. lazily construct one on Schema. Since presumably all batches from a fragment share the same underlying schema, this would help a lot. (However, once we get beyond simple projections, preserving that property will be difficult.)

    The change to column_data is a very easy win and shouldn't pessimize any existing path (the implementation already operates on a vector& - so it was making a pointless copy), so I'll submit that as a PR while we decide what to do about the rest.

asfimport commented 3 years ago

David Li / @lidavidm: And just to confirm, adding a cache of FieldRef->FieldPath lookups to projection trims another ~400ms off the runtime; at that point, the remaining overhead is in Expression::Bind (for which a similar cache would suffice), and in little things like (apparently) creating/moving/destroying futures that could then be tackled.

Profile: https://share.firefox.dev/3hhGBKj (You can see the aforementioned Future overhead in parquet::arrow::(anonymous namespace)::FileReaderImpl::GetRecordBatchReader)

asfimport commented 3 years ago

David Li / @lidavidm: Finally, adding passing a fieldref-to-fieldpath cache to Bind() knocks out most of the rest of the slowdown, with the final result being 0.6s for a scan using Arrow 0.17 and 0.8s for the patched version here. (Note that master with use_legacy_dataset=True also takes about 0.8s so the difference may be in the Parquet reader itself.) Profile: https://share.firefox.dev/33CGu3N

@bkietz what do you think about optionally passing an unordered_map<FieldRef, FieldPath> cache to ExecuteScalarExpression and Expression::Bind? The invariant would have to be that the cache is not used with a schema that has the same fields but in a different order. However, I think we can maintain this easily enough, so long as we assume that all batches from the same fragment have the same schema. We can construct the cache when we start scanning a given fragment and isolate it to that fragment only. We could then visit the schema once to build the cache instead of visiting every field on every fieldref lookup.

(Also note that filtering is untested here and may benefit from this optimization too, and I didn't try the async scanner.)

asfimport commented 3 years ago

David Li / @lidavidm: Conbench does validate that the fix already committed improves runtime (it takes about 1/2 the time it did before): https://conbench.ursa.dev/compare/batches/a74b919eba6a41288169b7637cd37ba2...4f26dd7a80004373b3afaff5853b0718/

asfimport commented 3 years ago

Joris Van den Bossche / @jorisvandenbossche: Thanks for those analyses!

Something else I am wondering: in this specific case, there is actually no projection to be done. Would it be worth to also add a special case for this, assuming that checking the exact equality of schemas is faster than reprojecting the batch to the same schema (although for many column, checking schema equality might also be slow?)

asfimport commented 3 years ago

David Li / @lidavidm: I also thought about that and am of two minds. This case (no projection, or perhaps only column selection) is presumably a very common case and could warrant some special treatment to ensure it's consistently fast, and of course doing no work is much better than doing work quickly. But if some relatively simple optimizations would help in all cases, then I think that's worth pursuing over a special case.

Maybe it'd be worth benchmarking to ensure the optimizations here are enough of a speedup and don't slow down other cases (narrow schemas, selecting only a few columns, actual projections, etc.) and that'd both let us know about potential future regressions and help us decide if it's worth it.

As for schema equality, if we do special-case things: as with the optimizations described here, I think if we can assume that within a fragment, all batches will have the same schema, then that should reduce the overhead of checking schemas considerably since it'll be only O(fragments) (and, could be pipelined with other work).

asfimport commented 3 years ago

Neal Richardson / @nealrichardson: Forgive me if I'm missing what the actual problem is, but would a solution be to solve this case (no projection, or perhaps only column selection) in the higher layers (Python, R, etc.)? ScannerBuilder->Project can either take a vector of Expressions or a vector of string column names. Python/R could choose to Project with strings if it is only selecting columns and not modifying/deriving anything (and not Project at all if selecting everything).

asfimport commented 3 years ago

David Li / @lidavidm: Unfortunately no, projection is always 'active'; even if you don't set any projection, the Scanner sets a projection (of all columns using string names), which all ultimately flows through the same paths in C+. Of course, it could special-case that, but it'd be in C+.