apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
13.96k stars 3.41k forks source link

[Python] Dataset Scanner is single-threaded only #25531

Closed asfimport closed 4 years ago

asfimport commented 4 years ago

I'm not sure this is a misunderstanding, or a compilation issue (flags?) or an issue in the C++ layer.

I have 1000 parquet files with a total of 1 billion rows (1 million rows each file, ~20 columns). I wanted to see if I could go through all rows 1 of 2 columns efficiently (vaex use case).

 


import pyarrow.parquet
import pyarrow as pa
import pyarrow.dataset as ds
import glob
ds = pa.dataset.dataset(glob.glob('/data/taxi_parquet/data_*.parquet'))
scanned = 0
for scan_task in ds.scan(batch_size=1_000_000, columns=['passenger_count'], use_threads=True):
    for record_batch in scan_task.execute():
        scanned += record_batch.num_rows
scanned

This only seems to use 1 cpu.

Using a threadpool from Python:


# %%timeit
import concurrent.futures
pool = concurrent.futures.ThreadPoolExecutor()
ds = pa.dataset.dataset(glob.glob('/data/taxi_parquet/data_*.parquet'))
def process(scan_task):
    scan_count = 0
    for record_batch in scan_task.execute():
        scan_count += len(record_batch)
    return scan_count
sum(pool.map(process, ds.scan(batch_size=1_000_000, columns=['passenger_count'], use_threads=False)))

Gives me a similar performance, again, only 100% cpu usage (=1 core/cpu).

py-spy (profiler for Python) shows no GIL, so this might be something at the C++ layer.

Am I 'holding it wrong' or could this be a bug? Note that IO speed is not a problem on this system (it actually all comes from OS cache, no disk read observed)

 

Reporter: Maarten Breddels / @maartenbreddels Assignee: Maarten Breddels / @maartenbreddels

Original Issue Attachments:

Note: This issue was originally created as ARROW-9458. Please see the migration documentation for further details.

asfimport commented 4 years ago

Joris Van den Bossche / @jorisvandenbossche: Regarding your first example, that it doesn't do this in parallel automatically is expected (in principle reading a row group could also be done in parallel, but since in the datasets project different scan tasks can be run in parallel, I suppose reading a single row group is limited to one thread to avoid oversubscription, not fully sure about this).

To quote the Dataset.scan() docstring:

It produces a stream of ScanTasks which is meant to be a unit of work to be dispatched. The tasks are not executed automatically, the user is responsible to execute and dispatch the individual tasks, so custom local task scheduling can be implemented.

The Scanner.scan() (which is used under the hood) is a bit more explicit:

The caller is responsible to dispatch/schedule said tasks. Tasks should be safe to run in a concurrent fashion and outlive the iterator.

But the use_threads argument to Dataset.scan() is thus certainly confusing. I need to check to be sure, but AFAIK this keyword is only used when using the to_table method (which does automatic execution of the scan tasks).

But, so since the scan method is meant to do this task execution yourself, your second example should be a target use case. So if that is not working, that's something to further investigate.

asfimport commented 4 years ago

Joris Van den Bossche / @jorisvandenbossche: Ah, and forgot to note: we are experimenting with plugging pyarrow.dataset into dask's dd.read_parquet, and there it is certainly achieving parallelism (so probably not a fundamental issue at the C++ layer). We are not using the scanner interface, but it's executing fragment.to_table(...., use_threads=False) for all fragments on the different cores/workers.

asfimport commented 4 years ago

Joris Van den Bossche / @jorisvandenbossche: @maartenbreddels how big are the row groups in your parquet files?

I am experimenting a bit with a local version of the taxi data as well, and I see that each scan task maps to a single row group, and not a single parquet file. In my case, this gave a lot of tasks, where each task doesn't take much time (only around 2-3 ms). Possibly too small to overcome python overhead from running them in parallel? (but I would need to rewrite the parquet files to test this, and that it is not an issue with the scanner).

When using fragments manually (which map to single parquet files), I do see parallelization.

asfimport commented 4 years ago

Maarten Breddels / @maartenbreddels:  

Running this (now with all columns)


import pyarrow as pa
import pyarrow.dataset as ds
import concurrent.futures
import glob
pool = concurrent.futures.ThreadPoolExecutor()
ds = pa.dataset.dataset(glob.glob('/data/taxi_parquet/data_*.parquet'))
def process(scan_task):
    scan_count = 0
    for record_batch in scan_task.execute():
        scan_count += len(record_batch)
    return scan_count
sum(pool.map(process, ds.scan(batch_size=1_000_000, use_threads=False)))

The output of py-spy is:

 

image-2020-07-14-14-31-29-943.png

And it takes 5 minutes.

Indeed, if I run similarly with to_table()

 


import pyarrow as pa
import pyarrow.dataset as ds
import concurrent.futures
import glob
pool = concurrent.futures.ThreadPoolExecutor()
ds = pa.dataset.dataset(glob.glob('/data/taxi_parquet/data_*.parquet'))
def process(f):
    scan_count = 0
    return len(f.to_table(use_threads=False))
sum(pool.map(process, ds.get_fragments()))

I see much better cpu usage (although it seems to have difficulty getting started):

image-2020-07-14-14-38-16-767.png

 

And it takes 30 seconds (sometimes 16... seem very irregular).

changing the last part to use_threads=True, i.e.:


def process(f):
    scan_count = 0
    return len(f.to_table(use_threads=True))
sum(pool.map(process, ds.get_fragments()))

speeds it up to 9 seconds.

Note that I have 1000 files/fragments.

I hope this info gives some clues.

 

My best guess is that the path using the scanner/execute path has a lock/mutex at the c++ layer avoiding the effective use of multithreading.

 

asfimport commented 4 years ago

Maarten Breddels / @maartenbreddels: Did you set ? batch_size=1_000_000

asfimport commented 4 years ago

Joris Van den Bossche / @jorisvandenbossche:

Did you set ? batch_size=1_000_000

The batch size is a max size, not a minimum size. So if your row groups are smaller, it doesn't change anything, I presume.

Now, in the meantime made a second version of the dataset with much larger row groups: and that confirms what you see as well, no parallelization with scan tasks, while it works when using fragment.to_table.

asfimport commented 4 years ago

Joris Van den Bossche / @jorisvandenbossche: It might be we are not releasing the GIL in the ScanTask's execute method: https://github.com/apache/arrow/blob/6d7e4ecbaf4c9c90e2df326ad46233dc5dcfcc7a/python/pyarrow/_dataset.pyx#L1817-L1818

asfimport commented 4 years ago

Maarten Breddels / @maartenbreddels: Yes, in my case, the row groups are 1_000_000 rows, 1 rowgroup per file, setting the batch_size higher does not affect the batch size indeed.

 

 

asfimport commented 4 years ago

Maarten Breddels / @maartenbreddels: Indeed, seeing a massive speedup. Too bad py-spy didn't notice :(

asfimport commented 4 years ago

Maarten Breddels / @maartenbreddels: let me know if you want to do the honors yourself, otherwise, I can open a PR tonight

asfimport commented 4 years ago

Joris Van den Bossche / @jorisvandenbossche: How do you release the GIL with a yielding for loop?

(since we are about to cut a release (probably tomorrow), the sooner the better ;))

asfimport commented 4 years ago

Ben Kietzman / @bkietz: This looks like it might be a systemic problem with using a CIterator as the range of a for loop (which we do elsewhere in _dataset.pyx). (In a follow up) Ideally we could provide a wrapper to do lock acquisition and release transparently whenever we need to interact with a CIterator:


for maybe_batch in Iterate(self.task.Execute()):
    yield pyarrow_wrap_batch(GetResultValue(move(maybe_batch))) 
asfimport commented 4 years ago

Wes McKinney / @wesm: Issue resolved by pull request 7756 https://github.com/apache/arrow/pull/7756