huggingface / datasets

🤗 The largest hub of ready-to-use datasets for ML models with fast, easy-to-use and efficient data manipulation tools
https://huggingface.co/docs/datasets
Apache License 2.0
19.16k stars 2.67k forks source link

AutoSharding IterableDataset's when num_workers > 1 #5984

Open mathephysicist opened 1 year ago

mathephysicist commented 1 year ago

Feature request

Minimal Example

import torch
from datasets import IterableDataset

d = IterableDataset.from_file(<file_name>)
dl = torch.utils.data.dataloader.DataLoader(d,num_workers=3)

for sample in dl:
    print(sample)

Warning: Too many dataloader workers: 2 (max is dataset.n_shards=1). Stopping 1 dataloader workers. To parallelize data loading, we give each process some shards (or data sources) to process. Therefore it's unnecessary to have a number of workers greater than dataset.n_shards=1. To enable more parallelism, please split the dataset in more files than 1.

Expected Behavior: Dataset is sharded each cpu uses subset (contiguously - so you can do checkpoint loading/saving)

Motivation

I have a lot of unused cpu's and would like to be able to shard iterable datasets with pytorch's dataloader when num_workers > 1. This is for a very large single file. I am aware that we can use the split_dataset_by_node to ensure that each node (for distributed) gets different shards, but we should extend it so that this also continues for multiple workers.

Your contribution

If someone points me to what needs to change, I can create a PR.

mariosasko commented 1 year ago

For this to be possible, we would have to switch from the "Streaming" Arrow format to the "Random Access" (IPC/Feather) format, which allows reading arbitrary record batches (explained here). We could then use these batches to construct shards.

@lhoestq @albertvillanova Do you think this use case is worth the switch? Also, we currently shard files, not inner row groups/chunks. Should we also support sharding row groups (e.g. if the number of input files is 1)?

PS: I don't expect significant speed-up for local, uncompressed Arrow files.

lhoestq commented 1 year ago

Alternatively we could support multiprocessing map for iterable datasets and let the user do the CPU intensive task there ?

This way it would work on arrow data but also on any iterable dataset

imarquart commented 1 year ago

For this to be possible, we would have to switch from the "Streaming" Arrow format to the "Random Access" (IPC/Feather) format, which allows reading arbitrary record batches (explained here). We could then use these batches to construct shards.

@lhoestq @albertvillanova Do you think this use case is worth the switch? Also, we currently shard files, not inner row groups/chunks. Should we also support sharding row groups (e.g. if the number of input files is 1)?

PS: I don't expect significant speed-up for local, uncompressed Arrow files.

Could you explain why you'd need to change the arrow format?

When we use streaming datasets we simply determine the number of worker shards and then add some modulo logic at the appropriate place. Worst case scenario, you'd skip streaming entries according to the number of shards.

For PyTorch, I'd be happy to provide an implementation or a sketch thereof, if you point me toward what the testing requirements would be for such a PR.

lhoestq commented 1 year ago

Could you explain why you'd need to change the arrow format?

This way workers have random access to the location of the file where its dataset subset starts. Currently we're using the Arrow streaming format which doesn't include the metadata of the record batches offsets. This is needed here to efficiently split a dataset made of one single file.

imarquart commented 1 year ago

Could you explain why you'd need to change the arrow format?

This way workers have random access to the location of the file where its dataset subset starts. Currently we're using the Arrow streaming format which doesn't include the metadata of the record batches offsets. This is needed here to efficiently split a dataset made of one single file.

I guess I don't understand why you'd need to subset the dataset in the first place. It seems sufficient to figure out how to offset or skip rows.

For instance, using pyArrow, you could use RecordBatchStreamReader to zero-copy iterate over records with read_next_batch and then only initiate the next step for records modulo worker shard. That's one way to do it, where of course you'd need to account for gpu sharding as well.

Otherwise, how did you implement worker/node/GPU sharding for iterable/streaming data where you do not have index information or prior splits (e.g. files)?

lhoestq commented 1 year ago

For instance, using pyArrow, you could use RecordBatchStreamReader to zero-copy iterate over records with read_next_batch and then only initiate the next step for records modulo worker shard.

That works indeed ! And what we meant is that you can make it even faster to instantiate. Indeed using RecordBatchStreamReader you need to get the list of all the record batches in each worker, whereas you could just get the list of record batches per worker if you use the record batches locations in the Arrow IPC file footer. This would be especially appreciated to have a fast instantiation in case you have tens of thousands of Arrow files for example.

lololololoki commented 10 months ago

Any recent updates on this ?

pauli31 commented 7 months ago

I would also appreciate this feature