huggingface / datasets

🤗 The largest hub of ready-to-use datasets for ML models with fast, easy-to-use and efficient data manipulation tools
https://huggingface.co/docs/datasets
Apache License 2.0
19.16k stars 2.67k forks source link

Save Dataset as Sharded Parquet #7047

Open tom-p-reichel opened 3 months ago

tom-p-reichel commented 3 months ago

Feature request

to_parquet currently saves the dataset as one massive, monolithic parquet file, rather than as several small parquet files. It should shard large datasets automatically.

Motivation

This default behavior makes me very sad because a program I ran for 6 hours saved its results using to_parquet, putting the entire billion+ row dataset into a 171 GB single shard parquet file which pyarrow, apache spark, etc. all cannot work with without completely exhausting the memory of my system. I was previously able to work with larger-than-memory parquet files, but not this one. I assume the reason why this is happening is because it is a single shard. Making sharding the default behavior puts datasets in parity with other frameworks, such as spark, which automatically shard when a large dataset is saved as parquet.

Your contribution

I could change the logic here https://github.com/huggingface/datasets/blob/bf6f41e94d9b2f1c620cf937a2e85e5754a8b960/src/datasets/io/parquet.py#L109-L158 to use pyarrow.dataset.write_dataset, which seems to support sharding, or periodically open new files. We would only shard if the user passed in a path rather than file handle.

tom-p-reichel commented 3 months ago

To anyone else who finds themselves in this predicament, it's possible to read the parquet file in the same way that datasets writes it, and then manually break it into pieces. Although, you need a couple of magic options (thrift_*) to deal with the huge metadata, otherwise pyarrow immediately crashes.

import pyarrow.parquet as pq
import pyarrow as pa

r = pq.ParquetReader()

r.open("./outrageous-file.parquet",thrift_string_size_limit=2**31-1, thrift_container_size_limit=2**31-1)

from more_itertools import chunked
import tqdm

for i,chunk in tqdm.tqdm(enumerate(chunked(range(r.num_row_groups),10000))):
    w = pq.ParquetWriter(f"./chunks.parquet/chunk{i}.parquet",schema=r.schema_arrow)
    for idx in chunk:
        w.write_table(r.read_row_group(idx))
    w.close()
lhoestq commented 3 months ago

You can also use .shard() and call to_parquet() on each shard in the meantime:

num_shards = 128
output_path_template = "output_dir/{index:05d}.parquet"
for index in range(num_shards):
    shard = ds.shard(index=index, num_shards=num_shards, contiguous=True)
    shard.to_parquet(output_path_template.format(index=index))