Open lorabit110 opened 1 year ago
Hi @lorabit110 , thanks for raising a feature request. We have added your feature request to our queue. Wondering, how are you using streaming dataset today ? What distributed orchestration library are you using today ? (is it torchrun, composer, or something else) ?
We are using ClearML to send tasks to GPUs. The main process spins up other processes for DDP training. Each task can use up to 8 GPUs. We are not doing multi-node DDP yet.
Does ClearML has their own distributed job logic or are you using torchrun under the hood ?
Nope. I am using torch.multiprocessing to start sub processes.
I would love to see this happen too!
@karan6181 Is there an example of using streaming with composer, I'm currently struggling to get streaming working with multi-node processes, the composer documentation states that it is compatible, but doesn't give an example.
Currently getting the following stack trace when attempting to initialize StreamingVisionDataset
0: Error in sys.excepthook:
0: Traceback (most recent call last):
0: File "/work/09753/hprairie/ls6/miniconda3/envs/mambaX/lib/python3.11/site-packages/torch/distributed/distributed_c10d.py", line 1339, in _distributed_excepthook
0: prefix = f"[rank{get_rank()}]"
0: ^^^^^^^^^^
0: File "/work/09753/hprairie/ls6/miniconda3/envs/mambaX/lib/python3.11/site-packages/torch/distributed/distributed_c10d.py", line 1746, in get_rank
0: default_pg = _get_default_group()
0: ^^^^^^^^^^^^^^^^^^^^
0: File "/work/09753/hprairie/ls6/miniconda3/envs/mambaX/lib/python3.11/site-packages/torch/distributed/distributed_c10d.py", line 1008, in _get_default_group
0: raise ValueError(
0: ValueError: Default process group has not been initialized, please make sure to call init_process_group.
0:
0: Original exception was:
0: Traceback (most recent call last):
0: File "<frozen runpy>", line 198, in _run_module_as_main
0: File "<frozen runpy>", line 88, in _run_code
0: File "/work/09753/hprairie/ls6/projects/MambaX/src/training/classification.py", line 162, in <module>
0: main(config)
0: File "/work/09753/hprairie/ls6/projects/MambaX/src/training/classification.py", line 64, in main
0: eval_dataspec = build_loader(
0: ^^^^^^^^^^^^^
0: File "/work/09753/hprairie/ls6/projects/MambaX/src/training/data/base.py", line 62, in build_loader
0: dataset = StreamingVisionDataset(
0: ^^^^^^^^^^^^^^^^^^^^^^^
0: File "/work/09753/hprairie/ls6/miniconda3/envs/mambaX/lib/python3.11/site-packages/streaming/vision/base.py", line 133, in __init__
0: StreamingDataset.__init__(self,
0: File "/work/09753/hprairie/ls6/miniconda3/envs/mambaX/lib/python3.11/site-packages/streaming/base/dataset.py", line 431, in __init__
0: destroy_dist = maybe_init_dist()
0: ^^^^^^^^^^^^^^^^^
0: File "/work/09753/hprairie/ls6/miniconda3/envs/mambaX/lib/python3.11/site-packages/streaming/base/distributed.py", line 128, in maybe_init_dist
0: dist.init_process_group(backend=backend, rank=get_rank(), world_size=get_world_size())
0: File "/work/09753/hprairie/ls6/miniconda3/envs/mambaX/lib/python3.11/site-packages/torch/distributed/c10d_logger.py", line 75, in wrapper
0: return func(*args, **kwargs)
0: ^^^^^^^^^^^^^^^^^^^^^
0: File "/work/09753/hprairie/ls6/miniconda3/envs/mambaX/lib/python3.11/site-packages/torch/distributed/c10d_logger.py", line 89, in wrapper
0: func_return = func(*args, **kwargs)
0: ^^^^^^^^^^^^^^^^^^^^^
0: File "/work/09753/hprairie/ls6/miniconda3/envs/mambaX/lib/python3.11/site-packages/torch/distributed/distributed_c10d.py", line 1305, in init_process_group
0: store, rank, world_size = next(rendezvous_iterator)
0: ^^^^^^^^^^^^^^^^^^^^^^^^^
0: File "/work/09753/hprairie/ls6/miniconda3/envs/mambaX/lib/python3.11/site-packages/torch/distributed/rendezvous.py", line 246, in _env_rendezvous_handler
0: store = _create_c10d_store(master_addr, master_port, rank, world_size, timeout, use_libuv)
0: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
0: File "/work/09753/hprairie/ls6/miniconda3/envs/mambaX/lib/python3.11/site-packages/torch/distributed/rendezvous.py", line 174, in _create_c10d_store
0: return TCPStore(
0: ^^^^^^^^^
0: torch.distributed.DistNetworkError: Connection reset by peer
I am using the composer cli to launch also.
🚀 Feature Request
Currently StreamingDataset handles distributed data parallel training by itself. This makes it incompatible with Trainers that handles data distribution, such as transformers.Trainer (which also distribute the data to the corresponding process based on WORLD_SIZE and RANK). The request is to add a single node mode to StreamingDataset so that it can also work with trainers that supports dp/ddp.
Motivation