mosaicml / streaming

A Data Streaming Library for Efficient Neural Network Training
https://streaming.docs.mosaicml.com
Apache License 2.0
1.13k stars 142 forks source link

Shared Memory issue with multiple instances of Streaming Dataset in a multi-gpu setup #332

Open shivshandilya opened 1 year ago

shivshandilya commented 1 year ago

Environment

The issue

I am trying Streaming Dataset with Pytorch Lightning. In the setup section of the code I try to initialize multiple instances of Streaming Dataset for test, train and validation. These datasets then are passed the dataloaders. The problem arises with multiprocessing, when these three datasets are initialized concurrently.

  File "/anaconda/envs/testing/lib/python3.9/site-packages/streaming/base/dataset.py", line 373, in __init__
    self._shm_prefix_int, self._locals_shm = get_shm_prefix(my_locals, world)
  File "/anaconda/envs/testing/lib/python3.9/site-packages/streaming/base/shared/prefix.py", line 173, in get_shm_prefix
    shm = SharedMemory(name, True, len(data))
  File "/anaconda/envs/testing/lib/python3.9/site-packages/streaming/base/shared/memory.py", line 41, in __init__
    shm = BuiltinSharedMemory(name, create, size)
  File "/anaconda/envs/testing/lib/python3.9/multiprocessing/shared_memory.py", line 103, in __init__
    self._fd = _posixshmem.shm_open(
FileExistsError: [Errno 17] File exists: '/000009_locals'

This error seems to arise when workers try to create a Shared Memory that has already been created for a previous dataset. The same prefix_int is assigned again while shm creation. When only one dataset is initialized the code works as expected on multiple gpus. Am I missing something here? If not, then how can I go about creating multiple streaming datasets?

karan6181 commented 1 year ago

Hi @shivshandilya , Streaming dataset is not fully compatible with the PyTorch Lightening, there are some limitations on how PyTorch Lightening spins up the processes. For PyTorch Lightening (PTL) with DDP or other distributed strategy, when user run a script with python main.py --n_devices 8, the main process runs the main.py script and when you call trainer.fit(), PTL creates a N-1 (7 in this case) more processes (for a total of N, including the original main process). This means that the dataset created on the main process is constructed before any distributed environment variables are set, and before the other processes are even launched. Streaming dataset also relies on certain environment variables to get set before any process instantiate the StreamingDataset, otherwise it will use the default value for non-distributed use-case. Those variables are WORLD_SIZE, LOCAL_WORLD_SIZE, and RANK. Also, when using different strategy such as ddp_fork and ddp_spawn, PTL doesn't expose the environment variables (WORLD_SIZE, LOCAL_WORLD_SIZE, and RANK) which makes it impossible for Streaming to know how to split the dataset across ranks and how to communicate locally via multiprocessing shared memory.

I would recommend to either use torchrun or composer launcher or other launcher which spins up the process similar to torchrun/composer and sets the above environment variables.

shivshandilya commented 1 year ago

Hey @karan6181 , thanks for the info. TO avoid the above in my code, I use the LightningDataModule, which sets these variables in its setup call. I create the datamodule outside and then pass it to trainer.fit() which then initiates the process group and sets the variables. So anything and everything after setup() will have access to these variables. Therefore, when I run with only one dataset it runs as it should. The issue only arises when I try to instantiate two or more datasets in the setup calls.

karan6181 commented 1 year ago

I create the datamodule outside

can you please share the output of WORLD_SIZE, LOCAL_WORLD_SIZE, and RANK before passing to trainer.fit() ? When streamingDataset gets instantiated, the above environment variables should be set correctly.

shivshandilya commented 1 year ago

Hey, you are right. There are not set before trainer.fit(). Only inside the values are WORLD_SIZE=4, LOCAL_WORLD_SIZE=1, RANK=0, that is, before dataset instantiation. I guess the LOCAL_WORLD_SIZE is not set by them.

shivshandilya commented 1 year ago

@karan6181 Why does setting them manually before dataset initialization not work? I tried to set the LOCAL_WORLD_SIZE variable before the dataset initialization. The previous error doesn't occur now but the training is frozen, as in the iterations do not happen.

karan6181 commented 1 year ago

I believe some ranks are waiting for other ranks for synchronization and if the env variables are not set correctly, you will see a hang. And do you know if below part is still holds true ?

For PyTorch Lightening (PTL) with DDP or other distributed strategy, when user run a script with python main.py --n_devices 8, the main process runs the main.py script and when you call trainer.fit(), PTL creates a N-1 (7 in this case) more processes (for a total of N, including the original main process).

karan6181 commented 1 year ago

If you are running on 1 node 8xV100 (8 GPUs), WORLD_SIZE=8, LOCAL_WORLD_SIZE=8, and RANK goes from 0 to 7.

shivshandilya commented 1 year ago

I believe some ranks are waiting for other ranks for synchronization and if the env variables are not set correctly, you will see a hang. And do you know if below part is still holds true ?

For PyTorch Lightening (PTL) with DDP or other distributed strategy, when user run a script with python main.py --n_devices 8, the main process runs the main.py script and when you call trainer.fit(), PTL creates a N-1 (7 in this case) more processes (for a total of N, including the original main process).

Yes, it does. If I instantiate the dataset outside the trainer.fit() function, ddp crashes. Ddp needs a copy of the dataset for each gpu and not just one in the main process.

jiamings commented 1 year ago

@shivshandilya Hi, I am facing the same issue. Can you share an example script on how to set the environment variables properly? For instance, in the setup function of the LightningDataModule, I don't think LOCAL_WORLD_SIZE is created.

shivshandilya commented 1 year ago

Hey @jiamings , so I tried setting these variables for each rank in the setup function itself, but that too didn't seem to help the issue. The training started for small datasets but for comparatively larger ones, it would just hang. The reason for this again could be what @karan6181 suggested earlier.

I believe some ranks are waiting for other ranks for synchronization and if the env variables are not set correctly, you will see a hang.

However, the code below shows how I set up the env variables in the setup:

import os, dist

def setup(self, stage=None):
    os.environ['WORLD_SIZE'] = str(dist.get_world_size())
    os.environ['LOCAL_WORLD_SIZE'] = str(dist.get_world_size())
    os.environ['RANK'] = str(dist.get_rank())

    self.dataset = {}
    self.dataset["train"] = self._get_dataset(split="train")

While the code above set up the dataset for each rank, the sync between workers and ranks didn't seem to happen. In the end I moved to composer (on @karan6181's advice) and things work there without any problem. Let me know if things work in your particular example.

jiamings commented 1 year ago

Thanks!

@karan6181 Another problem I find troubling is this prevents two different users from using StreamingDataset on the same machine, since one user would create the 000000_locals file and the other user will not skip it -- instead, it tries to access it as if it is its own shared memory which would lead to permission issues.

This is not related to PyTorch Lightning at all.

jiamings commented 1 year ago

I was able to use torchrun with pytorch-lightning though, which solves the compatibility issue with pytorch lightning. Just use torchrun instead of python on your regular lightning script.

shivshandilya commented 1 year ago

Hey @jiamings , yes, torchrun seems to be working well for me too for now in a streaming-PTL setup. I am trying to check if this holds true in a multi-node setup too.

@karan6181 Another problem I find troubling is this prevents two different users from using StreamingDataset on the same machine, since one user would create the 000000_locals file and the other user will not skip it -- instead, it tries to access it as if it is its own shared memory which would lead to permission issues.

This is not related to PyTorch Lightning at all.

Can you explain this a bit more? What do you mean by two different users here?

dhruvnathawani commented 1 year ago

This problem occurs when 2 different users use Mosaic Streaming on the same VM machine

Solution : Deleting all the files in /dev/shm worked for me!

jiamings commented 1 year ago

Isn't concerning that StreamingDataset cannot be used by two user on the same machine?

AlaaKhaddaj commented 1 year ago

I would like to double down on the issue raised above: you cannot have two different training scripts (that use StreamingDataset) on the same machine. The reason is related to the way _filelock_root is initialized here.

We can fix that by having a unique identifier for each job. What is a bit confusing is whether the StreamingDataset used by the GPUs of the same job should have the same identifier, or each dataset on each of the GPUs should have its own identifier.

Thanks!

karan6181 commented 1 year ago

@jiamings, @dhruvnathawani, @AlaaKhaddaj, Can you guys share what Traceback you see when more than one user uses Streaming Dataset on the same machine? Do both users use the same `local ' directory?

genesis-jamin commented 1 year ago

Do any of you know if StreamingDataset is compatible with multi-node training via Ray Train?

dhruvnathawani commented 1 year ago

Yes that is right, when both users use the same /local directory

karan6181 commented 1 year ago

Hi @genesis-jamin, we have not tried out Ray + Streaming Dataset extensively. But I would like to understand if you see any issues using it with Ray.

karan6181 commented 1 year ago

@dhruvnathawani, Can you try specifying the remote=None in StreamingDataset if your dataset is local? If the dataset is remote, one must provide a unique local directory.

jbohnslav commented 8 months ago

~I'm running into what I believe is the same issue using PyTorch Lightning. I wrap my whole train script with torch.distributed.run.main(), aka torchrun.~

The following environment variables are set before Lightning initializes my dataset:

GROUP_RANK
GROUP_WORLD_SIZE
LOCAL_RANK
LOCAL_WORLD_SIZE
MASTER_ADDR
MASTER_PORT
RANK
WORLD_SIZE

~However, when I run with 2 GPUs, I get a timeout error in the StreamingDataset constructor waiting for a index.json file. I can download this file in <1 seconds from my machine, so it's something wrong with the StreamingDataset. Note that I use the StreamingDataset in a different repo without Lightning, I can use it fine.~

EDIT: wrapping my script in torchrun has solved multi-GPU training. I erroneously set my local directory to local_base/{rank}/{split}. This addition of {rank} caused the miscommunication issue.

wzf03 commented 5 months ago

May a little change to _get_path function of prefix.py can help. https://github.com/macroversus/streaming/blob/0ab01ffa7000e50955526fa17932c167d0756499/streaming/base/shared/prefix.py#L35-L45

def _get_path(prefix_int: int, name: str) -> str:
    """Get the name of the shared memory.

    Args:
        prefix (int): The prefix int.
        name (str): The name of the shared memory.

    Returns:
        str: Unique shared memory name.
    """
    return f'{os.environ.get("MOSAICML_STREAMING_SHM_PREFIX","")}{prefix_int:06}_{name}'

The environment variable can be defined to prevent conflicts happen. And the environment variable can also remind the user to manually cleanup the /dev/shm if necessary.