YunchaoYang / Blogs

blogs and notes, https://yunchaoyang.github.io/blogs/
0 stars 0 forks source link

Distributed Data Parallel on PyTorch #3

Open YunchaoYang opened 2 years ago

YunchaoYang commented 2 years ago

When your training script utilizes DDP to run on single or multiple nodes, it will spawn multiple processes; each will run on a different GPU. Every process needs to know how many other processes are out there doing the same job (world size) and how to reach the main process. Every process needs to have a local rank and a global rank. The local rank defines its order within the node it is running on, while the global rank defines its order within all the processes.

image

After spawning the multiple processes and giving each process a copy of world_size, local_rank, global_rank, and address of the main process, you need to split the dataset into world_size chunks so that each GPU gets a different part of the dataset. Then you can wrap your model in a nn.parallel.DistributedDataParallel class, and you’re basically done.

It is very important to distinguish the code running on the main process from the code running on the children processes. For example, we have to define the CUDA_VISIBLE_DEVICES on the main process before we spawn the train function on the GPUs. We also define the dist_url for all GPUs to communicate back to the main process. Since we are running locally on a single node, the URL can be localhost and a random port. This URL will be given to all GPUs so that the main process can be reachable from any process.

The code running on the child process (on the GPU) will have specific initialization variables, such as the local rank. The torch.distributed.init_process_group does all the heavy work; it initializes the communication between processes and waits until it makes sure they can talk to each other. We set the seed on all GPUs to be the same for random initializations of parameters. The rest is to get the dataset loader, model, loss function and pass them to the trainer function.

This code can be easily modified to run this training on multiple nodes, not necessarily on the same cluster. You will need to manually change the “localhost” in args.dist_url to Noda A address and set the args.world_size to the total number of GPUs you intend to use from all nodes. You also need to pass the node rank to the train function, which can be added to the local rank to get the global rank of the GPU, as shown below.

args.rank = args.node_rank + gpu

That’s it; you can now run multiple copies of this script on different nodes and see the training happening on all GPUs simultaneously.

Basics

torch.distributed package:

The classtorch.nn.parallel.DistributedDataParallel() builds on this functionality to provide synchronous distributed training as a wrapper around any PyTorch model.

This differs from the kinds of parallelism provided by Multiprocessing package - torch.multiprocessing and torch.nn.DataParallel() in that it supports multiple network-connected machines and in that the user must explicitly launch a separate copy of the main training script for each process.

Initialization

The package needs to be initialized using the torch.distributed.init_process_group() function before calling any other methods.

torch.distributed.init_process_group(backend, init_method=None, timeout=datetime.timedelta(seconds=1800), world_size=- 1, rank=- 1, store=None, group_name='', pg_options=None)

There are 2 main ways to initialize a process group:

  1. Specify store, rank, and world_size explicitly.
  2. Specify init_method (a URL string) which indicates where/how to discover peers. Optionally specify rank and world_size, or encode all required parameters in the URL and omit them.

If neither is specified, init_method is assumed to be “env://”.

Parameters:

Currently three initialization methods are supported:

1. TCP initialization

There are two ways to initialize using TCP, both requiring a network address reachable from all processes and a desired world_size. (1) Specify an address that belongs to the rank 0 process. This initialization method requires that all processes have manually specified ranks.

dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
                        rank=args.rank, world_size=4)

2. Shared file-system initialization (Not explored)

Environment variable initialization

This method will read the configuration from environment variables, allowing one to fully customize how the information is obtained. The variables to be set are:

Similar to the TCP initialization, rank 0 will be used to set up all connections.

This is the default method, meaning that init method does not have to be specified (because it is default: env://).

Launch utility

The torch.distributed package also provides a launch utility in torch.distributed.launch. This helper utility can be used to launch multiple processes per node for distributed training.

torch.distributed.launch is a module that spawns up multiple distributed training processes on each of the training nodes.

see details at https://pytorch.org/docs/stable/distributed.html#launch-utility

When the DDP application is started via launch.py, it passes the world size,global rank, master address and master port via environment variables and the local rank as a command-line parameter to each instance. To use the launcher, an application needs to adhere to the following convention:

It must provide an entry-point function for a single worker. For example, it should not launch subprocesses using torch.multiprocessing.spawn It must use environment variables for initializing the process group.

For simplicity, the application can assume each process maps to a single GPU but in the next section we also show how a more general process-to-GPU mapping can be performed.

Launch.py

torch.distributed.launch is a module that spawns up multiple distributed training processes on each of the training nodes. --nproc_per_node

1. Single-Node multi-process distributed training

>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
               YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
               arguments of your training script)

2. Multi-Node multi-process distributed training: (e.g. two nodes)

Node 1: *(IP: 192.168.1.1, and has a free port: 1234)*
::
    >>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
               --nnodes=2 --node_rank=0 --master_addr="192.168.1.1"
               --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
               and all other arguments of your training script)
Node 2:
::
    >>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
               --nnodes=2 --node_rank=1 --master_addr="192.168.1.1"
               --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
               and all other arguments of your training script)

3. To look up what optional arguments this module offers:

::

python -m torch.distributed.launch --help

  1. This utility and multi-process distributed (single-node or multi-node) GPU training currently only achieves the best performance using the NCCL distributed backend.

  2. In your training program, you must parse the command-line argument: --local_rank=LOCAL_PROCESS_RANK. The value of which is set by running this launch utility.

If your training program uses GPUs, you should ensure that your code only runs on the GPU device of LOCAL_PROCESS_RANK. This can be done by:

Parsing the local_rank argument

    >>> import argparse
    >>> parser = argparse.ArgumentParser()
    >>> parser.add_argument("--local_rank", type=int)
    >>> args = parser.parse_args()

Set local rank to arg.local_rank

    >>> torch.cuda.set_device(args.local_rank)  # before your code runs
  1. In your training program, you are supposed to call the following function at the beginning to start the distributed backend. It is strongly recommended that init_method=env://.

Other init methods (e.g. tcp://) may work, but env:// is the one that is officially supported by this module.

    torch.distributed.init_process_group(backend='YOUR BACKEND',
                                         init_method='env://')
  1. In your training program, you can either use regular distributed functions or use :func:torch.nn.parallel.DistributedDataParallel module. If your training program uses GPUs for training and you would like to use :func:torch.nn.parallel.DistributedDataParallel module, here is how to configure it.
    model = torch.nn.parallel.DistributedDataParallel(model,
                                                      device_ids=[args.local_rank],
                                                      output_device=args.local_rank)

Please ensure that device_ids argument is set to be the only GPU device id that your code will be operating on. This is generally the local rank of the process (args.local_rank should be consistent in torch.cuda.set_device(args.local_rank) and DDP(device_ids=[args.local_rank])).

In other words, it is mandantary to set device_ids = [args.local_rank], and output_device = args.local_rank in order to use this launch utility.

  1. Another way to pass local_rank to the subprocesses (ie., your training script) via environment variable LOCAL_RANK. This behavior is enabled when you launch the script with --use_env=True. You MUST set LOCAL_RANK correctly. You must adjust the subprocess example above to replace args.local_rank with os.environ['LOCAL_RANK'];

If --use_env=True, the launcher will not pass --local_rank when you specify this flag. In other words, you do not need to put local_rank in argparser.

Spawn utility

The Multiprocessing package - torch.multiprocessing package also provides a spawn function in torch.multiprocessing.spawn().

This helper function can be used to spawn multiple processes. It works by passing in the function that you want to run and spawns N processes to run it. This can be used for multiprocess distributed training as well.

Debugging torch.distributed applications

references

  1. https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
  2. https://tuni-itc.github.io/wiki/Technical-Notes/Distributed_dataparallel_pytorch/
  3. https://medium.com/mlearning-ai/distributed-data-parallel-with-slurm-submitit-pytorch-168c1004b2ca
  4. https://gist.github.com/TengdaHan/1dd10d335c7ca6f13810fff41e809904
  5. https://github.com/yangkky/distributed_tutorial/blob/master/ddp_tutorial.md
  6. https://github.com/richardkxu/distributed-pytorch
YunchaoYang commented 2 years ago

Other references:

DistributedDataParallel(DDP) 在module级别实现数据并行性。它使用 torch.distributed包communication collectives来同步梯度,参数和缓冲区。并行性在单个进程内部和跨进程均有用。在一个进程中,_DDP将input module 复制到device_ids指定的设备,相应地按batch维度分别扔进模型,并将输出收集到outputdevice,这与DataParallel相似。

YunchaoYang commented 2 years ago

Why main_worker(gpu, args) in many files such as this MoCo has this extra argument gpu. Seems that it comes from nowhere.

The answer is that the first argument is the process id number. You just name it as one formal parameter, here it is named gpu.

def main_worker(gpu, args): # gpu Parameter control process number 

Thanks to the reference: https://chowdera.com/2022/03/202203260601583380.html

YunchaoYang commented 2 years ago

Details explanation of each lines on the worker process:

def train(gpu, args):
    ############################################################
    rank = args.nr * args.gpus + gpu # global rank of the process (one process per GPU)       
    dist.init_process_group(                                   
        backend='nccl',                                         
    init_method='env://',                                   
        world_size=args.world_size,                              
        rank=rank                                               
    )                                                          
    # Initialize the process and join up with the other processes. 
    # This is “blocking,” meaning that no process will continue until all processes have joined.
    # init_method -> look for environment variables for the MASTER_ADDR and MASTER_PORT
    # world_size -> total number of process across all nodes
    # rank -> global rank
    ############################################################

    torch.manual_seed(0)
    model = ConvNet()
    torch.cuda.set_device(gpu) # 
    model.cuda(gpu)
    batch_size = 100
    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().cuda(gpu)
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)

    ###############################################################
    # Wrap the model as DistributedDataParallel on this current specific device (process/gpu)
    model = nn.parallel.DistributedDataParallel(model,
                                                device_ids=[gpu])
    ###############################################################

    # Data loading code
    train_dataset = torchvision.datasets.MNIST(
        root='./data',
        train=True,
        transform=transforms.ToTensor(),
        download=True
    )                                               
    ################################################################
    train_sampler = torch.utils.data.distributed.DistributedSampler(
        train_dataset,
        num_replicas=args.world_size,
        rank=rank
    ) 
    # makes sure that each process gets a different slice of the training data.
    ################################################################

    train_loader = torch.utils.data.DataLoader(
        dataset=train_dataset,
       batch_size=batch_size,
    ##############################
       shuffle=False,            #
    ##############################
       num_workers=0,
       pin_memory=True,
    #############################
      sampler=train_sampler)    # instead of shuffling the usual way
    #############################

reference:

https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html

YunchaoYang commented 2 years ago

Suppress print on other nodes:

    # suppress printing if not master
    if args.multiprocessing_distributed and args.gpu != 0:
        def print_pass(*args):
            pass
        builtins.print = print_pass
YunchaoYang commented 2 years ago

Other references:

init_process_group hanging on HPC multi-node system w GPU

https://github.com/pytorch/pytorch/issues/76069

YunchaoYang commented 2 years ago

TORCHRUN

torchrun provides a superset of the functionality as torch.distributed.launch with the following additional functionalities:

torchrun is a python console script to the main module torch.distributed.run declared in the entry_points configuration in setup.py.

Migrate from torch.distributed.launch to torchrun

1. if --use_env was used, reading local_rank from LOCAL_RANK environment variables:

python -m torch.distributed.launch --use_env train_script.py
############################################
torchrun train_script.py

2. If reading --local_rank from argument parser. Change your training script to read from the LOCAL_RANK environment variable

args = parser.parse_args()  
local_rank = args.local_rank  
############################################
local_rank = int(os.environ["LOCAL_RANK"])

Examples

Single-node multi-worker

torchrun
    --standalone
    --nnodes=1
    --nproc_per_node=$NUM_TRAINERS
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

Fault tolerant (fixed sized number of workers, no elasticity, tolerates 3 failures)

torchrun
    --nnodes=$NUM_NODES
    --nproc_per_node=$NUM_TRAINERS
    --max_restarts=3
    --rdzv_id=$JOB_ID
    --rdzv_backend=c10d
    --rdzv_endpoint=$HOST_NODE_ADDR
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

HOST_NODE_ADDR, in form [:] (e.g. node1.example.com:29400), specifies the node and the port on which the C10d rendezvous backend should be instantiated and hosted.

Elastic (min=1, max=4, tolerates up to 3 membership changes or failures)

torchrun
    --nnodes=1:4
    --nproc_per_node=$NUM_TRAINERS
    --max_restarts=3
    --rdzv_id=$JOB_ID
    --rdzv_backend=c10d
    --rdzv_endpoint=$HOST_NODE_ADDR
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

HOST_NODE_ADDR, in form [:] (e.g. node1.example.com:29400), specifies the node and the port on which the C10d rendezvous backend should be instantiated and hosted. It can be any node in your training cluster, but ideally you should pick a node that has a high bandwidth.

--use_env is set by default in torchrun.

YunchaoYang commented 2 years ago

https://github.com/stas00/toolbox/blob/master/pytorch/torch-distributed-gpu-test.py

YunchaoYang commented 2 years ago

Some interesting post:

  1. Why using mp.spawn is slower than using torch.distributed.launch when using multi-GPU training #47587

https://github.com/pytorch/pytorch/issues/47587 " when using torch.distributed.launch, I can see multiple processes through ps -ef | grep train_multi instruction, but when I use mp.spawn, I can only see one processes. " " I'd like to add that launch util uses command line process launch (so you see many processes in ps -ef), but multiprocessing gives you more control and flexibility: fork or spawn (as @pritamdamania87 mentioned), queues, pipe, sync etc, so perhaps there is more overhead. "

mp.spawn can be replaced by Process. The process is not slower than.

YunchaoYang commented 2 years ago

SLURM environment variables

reference: https://stackoverflow.com/questions/65960509/slurm-error-when-submitting-to-multiple-nodes-slurmstepd-error-execve-py

YunchaoYang commented 2 years ago

A excellent blog explains detailed experiments:

https://www.telesens.co/2019/04/04/distributed-data-parallel-training-using-pytorch-on-aws/#Distributed_Data_Parallel

https://theaisummer.com/distributed-training-pytorch/

YunchaoYang commented 2 years ago

Note of Paper reading: https://arxiv.org/pdf/2006.15704.pdf

Data parallel training is subject to subtle dependencies between computations and communications.

Experiments also cover the comparison between NCCL and Gloo communication libraries. The results show that 1) communication is the dominant training latency contributor, and its impact increases with model sizes; 2) bucket sizes considerably affect communication efficiency, which could lead to more than 2X speedup if configured properly; 3) skipping synchronizations appropriately would significantly reduce amortized communication overhead without noticeably degrading convergence speed.