apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.43k stars 3.51k forks source link

Selective reading of rows for parquet file #29172

Open asfimport opened 3 years ago

asfimport commented 3 years ago

The current interface for selective reading is to use filters https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html

The approach works well when the filters are simple (field in (v1, v2, v3, …), and when the number of columns in small. It does not work well for the folllowing conditions, which currently requires reading the complete set into (python) memory.

Reporter: Yair Lenga

Related issues:

Note: This issue was originally created as ARROW-13517. Please see the migration documentation for further details.

asfimport commented 3 years ago

Weston Pace / @westonpace:

when condition is complex (e.g. condition between attributes: field1 + field2 > filed3)

This is currently possible using the new datasets API and expressions:


import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as pads

table = pa.Table.from_pydict({'x': [1, 2, 3, 4, 5, 6], 'y': [-2, -2, -2, -2, -2, -2], 'z': [-10, 0, 0, -10, 0, 0]})
pq.write_table(table, '/tmp/foo.parquet')

ds = pads.dataset('/tmp/foo.parquet')
table2 = ds.to_table(filter=(pads.field('x') + pads.field('y')) > pads.field('z'))
print(table2.to_pydict())

When file as many columns (making it costly to create python structures).

It's not clear to me what is expensive here. You mention there are 200 columns. Both the old and new approaches should be pretty workable manipulating metadata of 200 columns in python.

I believe possible to achieve something similar using the c++ stream_reader

You might be able to achieve some benefits in some situations by skipping data. In general this is a difficult problem with Parquet. For example, compressed pages typically need to be fully read into memory and can't support any kind of skipping. Furthermore, many pages are going to use run length encoding, which makes it impossible to skip to a particular value. As a general rule of thumb you cannot do a partial read of a parquet page. So if a page has 100k rows in it and you only want 10 rows out of it then, even if you have the indices, you typically need to read the page into memory first.

asfimport commented 3 years ago

Yair Lenga: Thanks for pointing to the new data set API. For my situation (reading small number of rows from a large data set) - I believe it be beneficial if the above will be implemented. In particular two benefits:

asfimport commented 3 years ago

Weston Pace / @westonpace:

Queries result (e.g., list of rows matching a condition) - can be cached, and reused to re-load data without having to perform linear scans over complete data set.

I don't think you'd be able to reduce the amount of I/O actually required since you'd need to load whole row-groups / column-chunks either way. I do agree that you'd reduce the computational load somewhat as handling indices should be simpler than applying a query. However, if you're doing actual I/O this is likely to be a fraction of the total cost and if you're working on in-memory data you'd be better off memory mapping an IPC file.

The C++ Stream API seems to support both skipping over row groups, and skipping over column chunks. This can potentially reduce reading by significant factor when recalling data for queries that have been processed in the past.

The datasets API supports both of these types of skips.

I believe that the AWS S3 select has similar capabilities

From (https://docs.aws.amazon.com/AmazonS3/latest/userguide/selecting-content-from-objects.html) I see "For Parquet objects, all of the row groups that start within the scan range requested are processed." so I believe it is doing the same column-chunk and row-group skipping that the dataset API currently supports. There will be an advantage with S3 select as the parquet metadata is read and processed in the data center although I am not sure how much of a difference that will make.

I get much faster performance than the performance I see on my desktop Python

Can you describe what you are trying on your desktop python and what s3 select you are performing to get the similar results?

How hard it will be to build this logic into Python to realize above saving ? While it might not be trivial to implement - for certain cases it will be extremely valuable.

Given that you are going to have to load the entire column chunk into memory either way you could probably do this in pure python using the compute module with something like...


import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq

table = pa.Table.from_pydict({'x': [1, 2, 3], 'y': ['x', 'y', 'z']})
pq.write_table(table, '/tmp/foo.parquet')

pfile = pq.ParquetFile('/tmp/foo.parquet')
row_group = pfile.read_row_group(0)
# Assuming you want indices 0,2.  This is just an example
row_group.take([0, 2])
asfimport commented 3 years ago

Yair Lenga: Thanks for taking the time to provide good feedback.

You are correct that there is something "wrong" with my local box. I suspect that I am running out of actual memory (low End free AWS instance), resulting in actual IO/swap, whereas the S3 select is not short on resources.

Running on a "fresh" instance solve make the processing noticeably better.

Thanks again for your patience. Yair