mosaicml / streaming

A Data Streaming Library for Efficient Neural Network Training
https://streaming.docs.mosaicml.com
Apache License 2.0
1.11k stars 138 forks source link

StreamingDataset with DDP hangs and then crashes with NCCL timeout error #307

Open greeneggsandyaml opened 1 year ago

greeneggsandyaml commented 1 year ago

Environment

AWS Deep Learning Machine with 8xA100 and CUDA 11.8

To reproduce

Steps to reproduce the behavior:

  1. Use StreamingDataset to load ImageNet from a local SSD using DDP.

Expected behavior

The data loads as expected when running on a single GPU. I expect the data to load in the same way on multiple GPUs.

Additional context

I'm using accelerate launch / torchrun to launch 8 processes. I'm loading from a local disk, not a remote file. I do this by passing the same (local) directory to both the local and remote arguments of StreamingDataset. Specifically, I have a dataset that looks like:

class CustomStreamingDataset(StreamingDataset):
    def __init__(
        self, 
        local: str, 
        remote: Optional[str] = None, 
        shuffle: bool = False, 
        batch_size: int = 1, 
        transform: Optional[Callable] = None,
    ):
        remote = local if remote is None else remote
        super().__init__(remote=remote, local=local, shuffle=shuffle, batch_size=batch_size)
        self.transform = transform

    def __getitem__(self, idx):
        item = super().__getitem__(idx)
        feats = item['features'].squeeze(0)
        label = item['class']
        if self.transform is not None:
            feats = self.transform(feats)
        return feats, label

And then I load it as follows:

if args.use_streaming_dataset:
    data_dir = f"{args.feature_path}/imagenet256_streaming"
    dataset = CustomStreamingDataset(data_dir, shuffle=True, batch_size=batch_size)
    load_kwargs = dict()
else:
    features_dir = f"{args.feature_path}/imagenet256_features"
    labels_dir = f"{args.feature_path}/imagenet256_labels"
    dataset = CustomDataset(features_dir, labels_dir)
    load_kwargs = dict(shuffle=True, pin_memory=True, drop_last=True)
loader = DataLoader(
    dataset, batch_size=batch_size, num_workers=args.num_workers, **load_kwargs
)

The code does (not) work under the following settings:

Eventually the program crashes with the following error:

RuntimeError: NCCL communicator was aborted on rank {RANK}.  Original reason for failure was: [Rank 1] Watchdog caught collective operation timeout: WorkNCCL(SeqNum=15, OpType=BROADCAST, Timeout(ms)=1800000) ran for 1804394 milliseconds before timing out.

where {RANK} is replaced by 0, 1, ... 7 on each process.

Perhaps this is related to #293. However, since it's not exactly the same, I thought I should leave a separate issue.

knighton commented 1 year ago

If 1 GPU is fine but 8 hang, are you setting the env vars? https://docs.mosaicml.com/projects/streaming/en/stable/fundamentals/environments.html

greeneggsandyaml commented 1 year ago

Yes, accelerate launch and torchrun automatically set the env vars, and DDP works with a non-StreamingDataset dataset.

karan6181 commented 1 year ago

Thanks for confirming! Is it possible for you to share the script ?

mpetri commented 1 year ago

I'm observing similar issues. Replacing streamingdataset with streaming.localdataset (and copying the files locally) also makes this go away. I'm suspecting there is some issue with downloading stuff and multiprocess stuff.

In my case I have two datasets, one for train that successfully uses streamingdataset and another one for dev for which I needed to replace streamingdataset with streaming.localdataset to make it not hang.

knagrecha commented 1 year ago

Same issue here

universome commented 1 year ago

I also face issues with the streaming dataset getting stuck in a multi-node multi-gpu setup. What I found to partially help is to call streaming.base.util.clean_stale_shared_memory() (wrapped into try/catch), though it just delays the freeze, not completely eliminates it (calling it regularly didn't help).

universome commented 1 year ago

Another dirty hack I found helpful is to step the dataloader once before my code like this:

streaming.base.util.clean_stale_shared_memory()
dataset = <instantiate your streaming dataset>
dataloader = <instantiate your streaming dataloader>
dataset_iterator = iter(inf_loop_dataloader(dataloader))
batch = next(iter(dataset_iterator)) # <----- This line is crucial, everything freezes without it.

# Do some other initialization and proceed with the training loop.
my_model, my_optimizer = <init model and optimizer>
while True:
  batch = next(dataset_iterator)
  # Do a training step.

So, before proceeding with the training, we take 1 batch from the dataloader. I have absolutely no idea why it helps, I randomly came across this trick while debugging. In my case, I do not count progress in "epochs" (I guess it's less common nowadays), but rather training steps, that's why I need this infinite batch provider which is just these 3 lines:

def inf_loop_dataloader(dataloader: torch.utils.data.DataLoader) -> Iterator[Dict[str, Any]]:
    while True:
        for batch in dataloader:
            yield batch
universome commented 1 year ago

Another reason when it stucks for me is when I do reading from a shared disk and specify remote=None. So, you can consider disabling it.

karan6181 commented 1 year ago

@greeneggsandyaml If your dataset resides locally, Can you try passing local=<local_dir> and remote=None using the latest streaming dataset version 0.6.0 ?

karan6181 commented 1 year ago

@mpetri For your use-case with more than one dataset, one can create n number of StreamingDataset irrespective of whether the dataset resides locally or not.

For local dataset: You need to provide a different local_dir to local param and remote as None. For example, let say, you are instantiating two StreamingDataset, one for train and one for val.

train_dataset = StreamingDataset(local='/tmp/dataset/train' , remote=None)
val_dataset = StreamingDataset(local='/tmp/dataset/val' , remote=None)

OR

train_dataset = StreamingDataset(local='/tmp/dataset/' , remote=None, split='train')
val_dataset = StreamingDataset(local='/tmp/dataset/' , remote=None, split='val')

For remote dataset: You need to provide a different local_dir to local param and remote as your cloud provider URL. Taking the same example as above:

train_dataset = StreamingDataset(local='/tmp/dataset/train' , remote='s3:/bucket/dataset_1')
val_dataset = StreamingDataset(local='/tmp/dataset/val' , remote='s3:/bucket/dataset_2')

OR

train_dataset = StreamingDataset(local='/tmp/dataset/' , remote='s3:/bucket/dataset_1', split='train')
val_dataset = StreamingDataset(local='/tmp/dataset/' , remote='s3:/bucket/dataset_2', split='val')
karan6181 commented 1 year ago

@universome Can you please explain your use-case in detail ? so that I can help you out. Thanks!

universome commented 1 year ago

Hi @karan6181 , thank you for your help. My main struggle is that I need to do some filtering on top of the StreamingDataset. Imagine that I have a large dataset (e.g., LAION), and 1) I want to only train on images with height >= 32px; and 2) sometimes there are broken samples which I want to ignore. What would be the best strategy to have such sort of data loading? Currently, I was considering the following solutions:

  1. Pre-process the dataset and make sure that all the images are non-broken and satisfy the condition width >= 32px. The problem with this approach is that we would need to re-shard the dataset if we change the filtering condition from width >= 32px to e.g., width >= 64px (which is quite likely).
  2. Implement a FilteredStreamingDataLoader class which performs iteration over the vanilla StreamingDataLoader inside its __iter__ and filters out bad samples. In this way, one yielded batch of FilteredStreamingDataLoader can yield multiple batches from the underlying StreamingDataLoader. The issue with this solution is that multiple processes can have different dataloader lengths, and this was actually a cause of hangs in some parts of my codebase (some unlucky processes were finishing their for-loops earlier).
  3. Use torch.utils.data.Subset on top of StreamingDataset to filter out images which sizes are not appropriate using the pre-computed metadata, and return a random sample from the dataset when we fail to decode an image. This is the solution which I was considering, but I guess it will not work since it would fragment the shards and would be reading samples via simple random access all the time.
  4. Filter out the samples from the batch during collation and tolerate a variable batch size per device. This is not a good solution since having a non-persistent batch size is not nice from the optimization perspective, and we'll even might have an empty batch size per device sometimes (and for large models with a small batch size per device, the probability of an empty batch increases).
universome commented 11 months ago

Hi @karan6181

In my case, the problem is that one worker finishes its epoch earlier, tries to rewind the epoch, gets stuck on the shared barrier, while other workers are still doing training on the first epoch and get stuck on the torch.distributed.barrier, which is always present in DDP training (it syncs normally on a backward pass). This leads to the deadlock until the entire run is killed on DDP timeout.

I have made a fork which simply removes all the shared barriers in _get_work and epoch resumption. Could you please take a look at it to say if there are any terrible side-effects of my changes (I didn't have time to analyze the entire codebase)? Can it lead to samples being duplicated among different workers? (as far as I understand it shouldn't since we still select worker_sample_ids from the same epoch_sample_ids). If it's just each worker computing epoch_sample_ids on its own (and epoch_sample_ids is still equivalent between all the workers), then it does not seem too big of a deal to be honest.

In my use-case, I have some filtering happening in the dataloader (filtering out short videos) and often have fewer iterations in some workers compared to other ones. When the amount of iterations among the workers is different, this leads to a deadlock for the reason I described above.

P.S. I had to also change the shuffling strategy in such a way that next_epoch is not taken from shared memory, but is rather unique for each worker. The rationale is that in generate_work, some workers might take the incremented next_epoch from the shared memory. Could you please tell the motivation to keep next_epoch in the shared memory? When can it diverge?

karan6181 commented 11 months ago

Hi @universome, I am curious, how did you filter the dataset? Is it possible for you to create a separate MDS dataset directory for a range of pixels such as 0-64, 64-128, 128-256, 256-512, etc. And then use Stream for each sub-dataset?

universome commented 11 months ago

Hi @karan6181 , right now, I actually work with videos and am currently filtering the dataset the following way:

For the first strategy, it's possible to create separate MDS datasets, but it's not possible to do this for the second one, because video decoding happens in a separate process (or processes) and is additionally influenced by a CPU utilization on the node — ffmpeg uses dynamic multi-threaded decoding (the number of threads depends on the current cumulative CPU utilization) and it's really difficult to prevent such multi-threaded behaviour (I tried various strategies/parameters — even taskset). And also there are really no alternatives to ffmpeg (it's used by all the modern libraries (av/opencv/etc) under the hood).

knighton commented 10 months ago

Hi @universome,

Something I noticed while scanning threads, sorry haven't fully read everything...

P.S. I had to also change the shuffling strategy in such a way that next_epoch is not taken from shared memory, but is rather unique for each worker.

I am just verifying that you are familiar with the relevant DataLoader args that control worker persistence across epochs. Of course if it fork/spawns every time you call __iter__, the epochs will not increment because this happens on the worker side, resulting in identical shuffles.

samsja commented 2 months ago

I still have hanging problem when using torchrun and mosaic streaming , anybody manage to fix them ?