huggingface / datasets

🤗 The largest hub of ready-to-use datasets for ML models with fast, easy-to-use and efficient data manipulation tools
https://huggingface.co/docs/datasets
Apache License 2.0
19.03k stars 2.63k forks source link

streaming datasets doesn't work properly with multi-node #6623

Open rohitgr7 opened 7 months ago

rohitgr7 commented 7 months ago

Feature request

Let’s say I have a dataset with 5 samples with values [1, 2, 3, 4, 5], with 2 GPUs (for DDP) and batch size of 2. This dataset is an IterableDataset since I am streaming it.

Now I split the dataset using split_dataset_by_node to ensure it doesn’t get repeated. And since it’s already splitted, I don’t have to use DistributedSampler (also they don't work with iterable datasets anyway)?

But in this case I noticed that the:

First iteraton: first GPU will get → [1, 2] first GPU will get → [3, 4]

Second iteraton: first GPU will get → [5] first GPU will get → Nothing

which actually creates an issue since in case of DistributedSampler, the samples are repeated internally to ensure non of the GPUs at any iteration is missing any data for gradient sync.

So my questions are:

  1. Here since splitting is happening before hand, how to make sure each GPU get’s a batch at each iteration to avoid gradient sync issues?
  2. Do we need to use DistributedSampler? If yes, how?
  3. in the docstrings of split_dataset_by_node, this is mentioned: "If the dataset has a number of shards that is a factor of world_size (i.e. if dataset.n_shards % world_size == 0), then the shards are evenly assigned across the nodes, which is the most optimized. Otherwise, each node keeps 1 example out of world_size, skipping the other examples." Can you explain the last part here?
  4. If dataset.n_shards % world_size != 0, is it possible to shard the streaming dataset on the fly to avoid the case where data is missing?

Motivation

Somehow streaming datasets should work with DDP since for big LLMs a lot of data is required and DDP/multi-node is mostly used to train such models and streaming can actually help solve the data part of it.

Your contribution

Yes, I can help in submitting the PR once we get mutual understanding on how it should behave.

rohitgr7 commented 7 months ago

@mariosasko, @lhoestq, @albertvillanova hey guys! can anyone help? or can you guys suggest who can help with this?

lhoestq commented 7 months ago

Hi !

  1. When the dataset is running of of examples, the last batches received by the GPU can be incomplete or empty/missing. We haven't implemented yet a way to ignore the last batch. It might require the datasets to provide the number of examples per shard though, so that we can know when to stop.
  2. Samplers are not compatible with IterableDatasets in pytorch
  3. if dataset.n_shards % world_size != 0 then all the nodes will read/stream the full dataset in order (possibly reading/streaming the same data multiple times), BUT will only yield one example out of world_size so that each example goes to one exactly one GPU.
  4. no, sharding should be down up-front and can take some time depending on the dataset size and format
rohitgr7 commented 7 months ago

if dataset.n_shards % world_size != 0 then all the nodes will read/stream the full dataset in order (possibly reading/streaming the same data multiple times), BUT will only yield one example out of world_size so that each example goes to one exactly one GPU.

considering there's just 1 shard and 2 worker nodes, do you mean each worker node will load the whole dataset but still receive half of that shard while streaming?

lhoestq commented 7 months ago

Yes both nodes will stream from the 1 shard, but each node will skip half of the examples. This way in total each example is seen once and exactly once during you distributed training.

Though it terms of I/O, the dataset is effectively read/streamed twice.

rohitgr7 commented 7 months ago

what if the number of samples in that shard % num_nodes != 0? it will break/get stuck? or is the data repeated in that case for gradient sync?

lhoestq commented 7 months ago

In the case one at least one of the noes will get an empty/incomplete batch. The data is not repeated in that case. If the training loop doesn't take this into account it can lead to unexpected behaviors indeed.

In the future we'd like to add a feature that would allow the nodes to ignore the last batch, this way all the nodes would only have full batches.

kkkjyu commented 6 months ago

In the case one at least one of the noes will get an empty/incomplete batch. The data is not repeated in that case. If the training loop doesn't take this into account it can lead to unexpected behaviors indeed.

In the future we'd like to add a feature that would allow the nodes to ignore the last batch, this way all the nodes would only have full batches.

Is there any method to modify one dataset's n_shard? modify the number of files is ok? one file == one shard?

lhoestq commented 6 months ago

modify the number of files is ok? one file == one shard?

Yep, one file == one shard :)

alex-hh commented 8 hours ago

Hi @lhoestq, do you have any advice on how to implement a fix for the case dataset.n_shards % world_size != 0 while such a fix is not supported in the library?

It seems essential for performing validation in a ddp setting

Simply limiting the number of files is a bit brittle as it relies on world size being consistent to ensure different runs see the same data

How should a user either ignore the last batch or handle the empty batch?

Is the issue of overhanging batches also relevant for map-style datasets?