apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.5k stars 3.53k forks source link

[C++][Parquet] splitting and saving big datasets consumes all available RAM and fails #38691

Open vkhodygo opened 11 months ago

vkhodygo commented 11 months ago

Describe the bug, including details regarding any error messages, version, and platform.

I have a dataset stored as .csv files, 1000 files, 1.000.000 records each. Id like to convert it to .parquet and split into logical partitions to save some space and ease access to it. However, when I try to do this my code consumes all available memory and fails spectacularly.

The code to get mock data is provided below. The actual values are a bit different in terms of distributions, but not to the point of orders of magnitude.

import numpy as np
import pyarrow as pa

SAMPLE_SIZE = 1_000_000_000
_rng = np.random.default_rng(123)

_schema = pa.schema([pa.field(f'f{i}', pa.float32()) for i in range(10)] + 
                    [pa.field(f'i{i}', pa.uint8()) for i in range(37)] +
                    [pa.field(f'b{i}', pa.bool_()) for i in range(40)])

data = pa.table({f'f{i}': np.random.randn(SAMPLE_SIZE) for i in range(10)} |
                {f'i{i}': np.random.randint(1, 13, SAMPLE_SIZE, dtype=np.uint8) for i in range(1)} |
                {f'i{i}': np.random.randint(1, 6, SAMPLE_SIZE, dtype=np.uint8) for i in range(1, 37)} |
                {f'b{i}': np.random.randint(0, 2, SAMPLE_SIZE, dtype=np.uint8) for i in range(40)},  schema=_schema)

I'd like to split this data the following way:

import pyarrow.dataset as ds
file_options = ds.ParquetFileFormat().make_write_options(compression='gzip')

ds.write_dataset(data,
                 base_dir="./syntet",
                 format="parquet",
                 file_options=file_options,
                 partitioning=ds.partitioning(
                     pa.schema([
                         ("b0", pa.bool_()),
                         ("i0", pa.uint8()),
                         ("i1", pa.uint8()),
                         ("i2", pa.uint8())
                     ]),
                     flavor="hive"
                 ),
                 existing_data_behavior="delete_matching"
                 )

That's 600 partitions, give or take. Considering 1.000.000.000 records that's ~1.670.000 rows per partition. Strictly speaking, I'd like to decrease this number even further by splitting it into 50-100 subsets since this is only a sample of data.

I tried both Python and R to do this, the outcome stays the same. I also tried multiple machines with the following RAM/swap specs: 64/200. 256/256. 1024/8, no result so far.

The only "working" solution is to read the whole dataset, filter a subset manually and save it. Wash, rinse, repeat; I think I had to read about 4TB of data in total during that, it also took somewhere around 10 hours to finish. This is clearly not sustainable.

I also tried searching across other performance related issues, but most if not all of them are about reading data. No, I'm afraid I can't reduce the number of partitions, the only direction here is to go even further. When you do cross-table matching or targeted selection even these chunks are too big.

Component(s)

C++, Parquet

mapleFU commented 11 months ago

Would you mind limit the max_open_files and max_rows_per_file to limit that?

https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html

And can I reproduce the problem with the mock data here?