apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.63k stars 3.56k forks source link

[Python] OOM with Dataset.scanner but ok with ParquetFile.iter_batches #44799

Open theogaraj opened 1 day ago

theogaraj commented 1 day ago

Describe the bug, including details regarding any error messages, version, and platform.

Setup

I am using pyarrow version 18.0.0.

I am running my tests on an AWS r6g.large instance running Amazon Linux. (I also attempted using instances with larger memory in case the problem was that there was some base-level memory needed irrespective of minimal batch sizes and readahead, but this didn't help.)

My data consists of parquet files in S3, varying in size from a few hundred kB to ~ 1GB, for a total of 3.4GB. This is a sample subset of my actual dataset which is ~ 50GB.

Problem description

I have a set of parquet files with very small row-groups, and I am attempting to use the pyarrow.dataset API to transform this into a set of files with larger row-groups. My basic approach is dataset -> scanner -> write_dataset. After running into OOM problems with default parameters, I ratcheted down the read and write batch sizes and concurrent readahead:

from pyarrow import dataset as ds

data = ds.dataset(INPATH, format='parquet')

# note the small batch size and minimal values for readahead
scanner = data.scanner(
    batch_size=50,
    batch_readahead=1,
    fragment_readahead=1
)

# again, note extremely small values for output batch sizes
ds.write_dataset(
    scanner,
    base_dir=str(OUTPATH),
    format='parquet',
    min_rows_per_group=1000,
    max_rows_per_group=1000
)

Running this results in increasing memory consumption (monitored using top) until the process maxes out available memory and is finally killed.

What worked to keep memory use under control was to replace the dataset scanner with ParquetFile.iter_batches as below:

from pyarrow import dataset as ds
import pyarrow.parquet as pq

def batcherator(filepath, batch_size):
    for f in filepath.glob('*.parquet'):
        with pq.ParquetFile(f) as pf:
            yield from pf.iter_batches(batch_size=batch_size)

scanner = batcherator(INPATH, 2000)   # it's fine with higher batch size than previous

ds.write_dataset(
    scanner,
    base_dir=str(OUTPATH),
    format='parquet',
    min_rows_per_group=10_000,   # again, higher values of write batch sizes
    max_rows_per_group=10_000
)

Since nothing's really changing on the dataset.write_dataset side, it seems like there's some issue with runaway memory use on the scanner side of things?

The closest I could find online was this DuckDB issue https://github.com/duckdb/duckdb/issues/7856 which in turn pointed to this arrow issue https://github.com/apache/arrow/issues/31486 but this seems to hint more at a problem with write_dataset, which for me seemed ok once I replaced how I am reading in the data.

Component(s)

Python

raulcd commented 22 hours ago

@pitrou @jorisvandenbossche FYI I add this to my list of things to investigate and try to reproduce.