pytorch / pytorch

Tensors and Dynamic neural networks in Python with strong GPU acceleration
https://pytorch.org
Other
83.78k stars 22.59k forks source link

torch.distributed.nn.all_reduce incorrectly scales the gradient #58005

Open DrJimFan opened 3 years ago

DrJimFan commented 3 years ago

πŸ› Bug

torch.distributed.nn.all_reduce computes different gradient values from torch.distributed.all_reduce. In particular, it seems to scale the gradients by world_size incorrectly.

To Reproduce

Script to reproduce the behavior:

import torch
import torch.distributed
import torch.distributed.nn

USE_NN_REDUCE = 1

def main_worker(gpu):
    torch.distributed.init_process_group(
        backend="nccl", init_method="tcp://localhost:12345", world_size=2, rank=gpu
    )
    torch.cuda.set_device(gpu)
    x = torch.ones(1).cuda(device=gpu).requires_grad_()
    xx = (gpu + 2) * x
    if USE_NN_REDUCE:
        print("nn.all_reduce")
        xx = torch.distributed.nn.all_reduce(xx)
    else:
        print("regular all_reduce")
        torch.distributed.all_reduce(xx)
    print("Value after all_reduce:", xx)
    xx.backward()
    print(f"gpu={gpu}, grad={x.grad}")

if __name__ == "__main__":
    torch.multiprocessing.spawn(main_worker, nprocs=2)

For torch.distributed.nn.all_reduce (i.e. USE_NN_REDUCE=1), the gradient seems to be scaled by world_size incorrectly:

Value after all_reduce: tensor([5.], device='cuda:0', grad_fn=<_AllReduceBackward>)
Value after all_reduce: tensor([5.], device='cuda:1', grad_fn=<_AllReduceBackward>)
gpu=0, grad=tensor([4.], device='cuda:0')
gpu=1, grad=tensor([6.], device='cuda:1')

For torch.distributed.all_reduce, it is correct:

Value after all_reduce: tensor([5.], device='cuda:0', grad_fn=<MulBackward0>)
Value after all_reduce: tensor([5.], device='cuda:1', grad_fn=<MulBackward0>)
gpu=0, grad=tensor([2.], device='cuda:0')
gpu=1, grad=tensor([3.], device='cuda:1')

Expected behavior

torch.distributed.nn.all_reduce should produce the correct gradient without scaling by world_size.

Environment

PyTorch version: 1.8.1+cu102
Is debug build: False
CUDA used to build PyTorch: 10.2
ROCM used to build PyTorch: N/A

OS: Ubuntu 18.04.5 LTS (x86_64)
GCC version: (Ubuntu 7.5.0-3ubuntu1~18.04) 7.5.0
Clang version: Could not collect
CMake version: version 3.10.2

Python version: 3.7 (64-bit runtime)
Is CUDA available: True
CUDA runtime version: Could not collect
GPU models and configuration:
GPU 0: TITAN RTX
GPU 1: TITAN RTX
GPU 2: TITAN RTX
GPU 3: TITAN RTX
GPU 4: TITAN RTX
GPU 5: TITAN RTX
GPU 6: TITAN RTX
GPU 7: TITAN RTX

Nvidia driver version: 450.66
cuDNN version: Could not collect
HIP runtime version: N/A
MIOpen runtime version: N/A

Versions of relevant libraries:
[pip3] numpy==1.18.5
[pip3] pytorch-lightning==1.2.7
[pip3] torch==1.8.1
[pip3] torchmetrics==0.3.1
[pip3] torchvision==0.9.1
[conda] blas                      1.0                         mkl
[conda] cudatoolkit               10.2.89              hfd86e86_1
[conda] mkl                       2020.2                      256
[conda] mkl-include               2020.2                      256
[conda] mkl-service               2.3.0            py37he904b0f_0
[conda] mkl_fft                   1.2.0            py37h23d657b_0
[conda] mkl_random                1.1.1            py37h0573a6f_0
[conda] numpy                     1.18.5                   pypi_0    pypi
[conda] pytorch                   1.7.0           py3.7_cuda10.2.89_cudnn7.6.5_0    pytorch
[conda] pytorch-lightning         1.1.8                    pypi_0    pypi
[conda] torchvision               0.8.1                py37_cu102    pytorch

cc @ezyang @gchanan @zou3519 @bdhirsh @jbschlosser @anjali411 @pietern @mrshenli @pritamdamania87 @zhaojuanmao @satgera @rohan-varma @gqchen @aazzolini @osalpekar @jiayisuse @agolynski @SciPioneer @H-Huang @mrzzd @cbalioglu @gcramer23

rohan-varma commented 3 years ago

It looks like scaling the gradient by the world size in torch.distributed.nn.all_reduce is intentional, see the implementation (it basically does a sum of allreduce, so the gradient will be scaled by the world size: https://github.com/pytorch/pytorch/blob/ac44569b0d87ef7c2c8c26f3057f797f40cc6111/torch/distributed/nn/functional.py#L263).

with regard to torch.distributed.all_reduce, it is not a torch.autograd.Function, so it is not part of the autograd graph and autograd does not backprop through it, so the gradient should be the same as if the allreduce was not done (which is the case in your example).

So I'm not sure if this is a bug or the intended behavior. @mrshenli @pritamdamania87 do either of you have any thoughts?

DrJimFan commented 3 years ago

Maybe I'm missing something, but I don't think this is the correct behavior.

First, torch.distributed.all_reduce actually does do backprop and compute the correct gradient. If we modify the above example slightly by adding yy = xx * xx and backprop from yy:

import torch
import torch.distributed
import torch.distributed.nn

USE_NN_REDUCE = 0

def main_worker(gpu):
    torch.distributed.init_process_group(
        backend="nccl", init_method="tcp://localhost:12345", world_size=2, rank=gpu
    )
    torch.cuda.set_device(gpu)
    x = torch.ones(1).cuda(device=gpu).requires_grad_()
    xx = (gpu + 2) * x
    if USE_NN_REDUCE:
        print("nn.all_reduce")
        xx = torch.distributed.nn.all_reduce(xx)
    else:
        print("regular all_reduce")
        torch.distributed.all_reduce(xx)
    print("Value after all_reduce:", xx)
    yy = xx * xx   # test if gradient backprops through all_reduce
    yy.backward()
    print(f"gpu={gpu}, grad={x.grad}")

if __name__ == "__main__":
    torch.multiprocessing.spawn(main_worker, nprocs=2)

For torch.distributed.all_reduce, the printout indicates that the gradient flows through yy=xx*xx to the original x before all-reduce:

regular all_reduce
Value after all_reduce: tensor([5.], device='cuda:1', grad_fn=<MulBackward0>)
Value after all_reduce: tensor([5.], device='cuda:0', grad_fn=<MulBackward0>)
gpu=1, grad=tensor([30.], device='cuda:1')
gpu=0, grad=tensor([20.], device='cuda:0')

For torch.distributed.nn.all_reduce, the printout is:

Value after all_reduce: tensor([5.], device='cuda:1', grad_fn=<_AllReduceBackward>)
Value after all_reduce: tensor([5.], device='cuda:0', grad_fn=<_AllReduceBackward>)
gpu=1, grad=tensor([60.], device='cuda:1')
gpu=0, grad=tensor([40.], device='cuda:0')

Second, let's compute the groundtruth gradient by a single process and manually summing the values to simulate all_reduce:

import torch

x_0 = torch.ones(1).cuda().requires_grad_()  # simulate process 0
x_1 = torch.ones(1).cuda().requires_grad_()  # simulate process 1

xx_0 = x_0 * 2
xx_1 = x_1 * 3

xx = xx_0 + xx_1  # simulate all_reduce()
yy = xx * xx
yy.backward()

print("Groundtruth x_0 gradient", x_0.grad)
print("Groundtruth x_1 gradient", x_1.grad)

this prints out:

Groundtruth x_0 gradient tensor([20.], device='cuda:0')
Groundtruth x_1 gradient tensor([30.], device='cuda:0')

So the result from torch.distributed.nn.all_reduce seems to be mathematically incorrect and should not be scaled by world_size

DrJimFan commented 3 years ago

Just want to follow up on this: is there anything I miss? Thanks!

mrshenli commented 3 years ago

I agree with @LinxiFan that distributed.nn.all_reduce shouldn't scale the gradients for SUM operator, and indeed needs a fix.

However, looks like distributed.all_reduce was correct in this case because allreduce default op is SUM where the grad_fn will be the same as identity. I tried using PRODUCT as the op, and looks like distributed.all_reduce is no longer correct. See the outputs below. I don't think distributed.all_reduce needs a fix though, as it wasn't claimed to support autograd, and as @rohan-varma mentioned above, we didn't implement distributed.all_reduce as an autograd function and all it does is in-place updates.

Ground Truth:

import torch

x_0 = torch.ones(1).cuda().requires_grad_()  # simulate process 0
x_1 = torch.ones(1).cuda().requires_grad_()  # simulate process 1

xx_0 = x_0 * 2
xx_1 = x_1 * 3

xx_0 = xx_0 * xx_0
xx_1 = xx_1 * xx_1

xx = xx_0 * xx_1  # simulate all_reduce()
yy = xx * xx
yy.backward()

print("Groundtruth x_0 gradient", x_0.grad)
print("Groundtruth x_1 gradient", x_1.grad)

outputs:

Groundtruth x_0 gradient tensor([5184.], device='cuda:0')
Groundtruth x_1 gradient tensor([5184.], device='cuda:0')

distributed.all_reduce

import torch
import torch.distributed
import torch.distributed.nn

USE_NN_REDUCE = 0

def main_worker(gpu):
    torch.distributed.init_process_group(
        backend="nccl", init_method="tcp://localhost:12345", world_size=2, rank=gpu
    )
    torch.cuda.set_device(gpu)
    x = torch.ones(1).cuda(device=gpu).requires_grad_()
    xx = (gpu + 2) * x
    if USE_NN_REDUCE:
        print("nn.all_reduce")
        xx = torch.distributed.nn.all_reduce(xx)
    else:
        print("regular all_reduce")
        torch.distributed.all_reduce(xx, op=torch.distributed.ReduceOp.PRODUCT)
    print("Value after all_reduce:", xx)
    yy = xx * xx   # test if gradient backprops through all_reduce
    yy.backward()
    print(f"gpu={gpu}, grad={x.grad}")

if __name__ == "__main__":
    torch.multiprocessing.spawn(main_worker, nprocs=2)

outputs

regular all_reduce
regular all_reduce
Value after all_reduce: tensor([6.], device='cuda:1', grad_fn=<MulBackward0>)
Value after all_reduce: tensor([6.], device='cuda:0', grad_fn=<MulBackward0>)
gpu=1, grad=tensor([36.], device='cuda:1')
gpu=0, grad=tensor([24.], device='cuda:0')
mrshenli commented 3 years ago

Another issue with when directly using distributed.all_reduce is that, with gloo backend, the grad_fn becomes CopyBackwards. I think this is because Gloo copies results to the tensor instead of doing in-place update. Although we don't need to offer autograd correctness for distributed.all_reduce, we might need to maintain consistency across backends. cc @agolynski

regular all_reduce
regular all_reduce
Value after all_reduce: tensor([6.], device='cuda:0', grad_fn=<CopyBackwards>)
Value after all_reduce: tensor([6.], device='cuda:1', grad_fn=<CopyBackwards>)
gpu=0, grad=tensor([24.], device='cuda:0')
gpu=1, grad=tensor([36.], device='cuda:1')
mrshenli commented 3 years ago

@LinxiFan do you mind share more details about your use cases? It will help us better prioritize our work items. IIRC, we haven't officially released distributed.nn.all_reduce yet, mainly due to lack of user requests. So I am curious how did you find it and what motivates you to try that out? :)

DrJimFan commented 3 years ago

Thanks @mrshenli for your detailed reply! Autograd-supported distributed.nn.all_reduce is essential to some contrastive learning methods that require large batches split over multiple GPUs. My particular use case is to implement the BarlowTwins algorithm from FAIR itself. The official repo actually has an issue about distributed autograd: https://github.com/facebookresearch/barlowtwins/issues/8 Now we can confirm that nn.all_reduce is indeed mathematically incorrect.

Thank you so much for your help!

mrshenli commented 3 years ago

@LinxiFan thanks a lot for sharing the context. We will see if we can prioritize fixing distributed.nn.* and release it as a prototype feature in v1.10.

DrJimFan commented 3 years ago

Thanks a lot!

ronghanghu commented 3 years ago

@LinxiFan @mrshenli @rohan-varma @cbalioglu Hi, I want to cast some doubt on the discussions above. I believe the current torch.distristribute.nn.all_reduce is actually the correct and desired behavior of summing the gradients across devices in its backward pass, and it is producing the correct gradient.

Let's consider a simple loss function over a batch loss = sum_i (y_i - mean_y), where mean_y = sum_i (y_i / N) is the mean over the batch. Clearly this loss is constantly zero where no matter what the input is. Suppose y_i is produced by some data x_i and parameter w as y_i = x_i * w, then the gradient over w is constantly zero (because no matter how w changes, the loss stays zero).

Now let's imagine we have a batch size of 2 with samples {100, 200} but our GPU can only fit a batch size of 1, so we need to implement this loss function in a distributed manner. Below is the implementation which can be used for DistributedDataParallel.

import torch
import torch.distributed
import torch.distributed.nn

USE_NN_REDUCE = 1

def main_worker(gpu):
    world_size = 2
    torch.distributed.init_process_group(
        backend="nccl", init_method="tcp://localhost:12345", world_size=world_size, rank=gpu
    )
    torch.cuda.set_device(gpu)
    w = torch.ones(1).cuda(device=gpu).requires_grad_()  # a param to learn
    x = (100 if gpu == 0 else 200)  # some input data
    y = w * x
    if USE_NN_REDUCE:
        print("nn.all_reduce")
        mean_y = torch.distributed.nn.all_reduce(y / world_size)
    else:
        print("regular all_reduce")
        mean_y = y / world_size
        torch.distributed.all_reduce(mean_y)

    loss = y - mean_y
    loss.backward()
    print(f"gpu={gpu}, loss={loss.item()}, grad={w.grad.item()}")

if __name__ == "__main__":
    torch.multiprocessing.spawn(main_worker, nprocs=2)

outputs for torch.distributed.nn.all_reduce (correct gradient as autograd is implemented)

nn.all_reduce
nn.all_reduce
gpu=0, loss=-50.0, grad=0.0
gpu=1, loss=50.0, grad=0.0

As we can see, the average loss over the two GPUs is zero (-50 + 50 = 0), and the average gradient over the two GPUs is also zero, meaning that parameter w won't be updated when training using DistributedDataParallel, which is desired and the correct equivalent of single-GPU training (since the total loss is constantly zero regardless of w). If one changes the current torch.distributed.nn.all_reduce to remove the gradient summing across devices or downscale its gradient by 1/world_size, the example above will be incorrect again.


outputs for torch.distributed.all_reduce (incorrect gradient because of no autograd)

regular all_reduce
regular all_reduce
gpu=0, loss=-50.0, grad=50.0
gpu=1, loss=50.0, grad=100.0

As we can see, the average loss over the two GPUs is zero (-50 + 50 = 0) as expected, but the average gradient over the two GPUs is NOT zero, and it is a positive number. So after gradient reduction in DistributedDataParallel, w will be updated, which is the undesired behavior.

Therefore we can see that torch.distributed.nn.all_reduce gives the correct equivalent to single-GPU behavior. I was looking into this issue when trying to implement SimCLR contrastive loss and building a SyncBN op in PyTorch XLA on a FAIR project.


@LinxiFan @mrshenli I feel the ground-truth in https://github.com/pytorch/pytorch/issues/58005#issuecomment-839112233 is not the single-GPU equivalent of all_reduce. Because we are doing two .backward calls (one on each GPU) in the distributed case, the equivalent single GPU ground-truth should be simulating two backward calls as follows

import torch

x_0 = torch.ones(1).cuda().requires_grad_()  # simulate process 0
x_1 = torch.ones(1).cuda().requires_grad_()  # simulate process 1

xx_0 = x_0 * 2
xx_1 = x_1 * 3

xx_gpu0 = xx_0 + xx_1  # simulate all_reduce() output on GPU 0
xx_gpu1 = xx_0 + xx_1  # simulate all_reduce() output on GPU 1
yy_gpu0 = xx_gpu0 * xx_gpu0  # simulate computation on GPU 0 after all_reduce
yy_gpu0.backward(retain_graph=True)
yy_gpu1 = xx_gpu1 * xx_gpu1  # simulate computation on GPU 1 after all_reduce
yy_gpu1.backward()

print("Groundtruth x_0 gradient", x_0.grad)
print("Groundtruth x_1 gradient", x_1.grad)

which gives

Groundtruth x_0 gradient tensor([40.], device='cuda:0')
Groundtruth x_1 gradient tensor([60.], device='cuda:0')

This matches what we get from torch.distributed.nn.all_reduce.

ppwwyyxx commented 3 years ago

Agree with @ronghanghu that torch.distributed.nn.all_reduce is actually correct in this example. And it also matches our vanilla implementation of differentiable allreduce in https://github.com/facebookresearch/fvcore/blob/master/fvcore/nn/distributed.py.

The gradient appears scaled because xx.backward() is called on each GPU in the original example. If you replace it by (xx * gpu).backward(), only GPU1 will do backward on xx and it will produce the results you expect:

import torch
import torch.distributed
import torch.distributed.nn

USE_NN_REDUCE = 1

def main_worker(gpu):
    torch.distributed.init_process_group(
        backend="nccl", init_method="tcp://localhost:12345", world_size=2, rank=gpu
    )
    torch.cuda.set_device(gpu)
    x = torch.ones(1).cuda(device=gpu).requires_grad_()
    xx = (gpu + 2) * x
    if USE_NN_REDUCE:
        print("nn.all_reduce")
        xx = torch.distributed.nn.all_reduce(xx)
    else:
        print("regular all_reduce")
        torch.distributed.all_reduce(xx)
    print("Value after all_reduce:", xx)
    (xx * gpu).backward()
    print(f"gpu={gpu}, grad={x.grad}")

if __name__ == "__main__":
    torch.multiprocessing.spawn(main_worker, nprocs=2)
mrshenli commented 3 years ago

Hey @ronghanghu, @ppwwyyxx, thanks a lot for the detailed explanation and discussion! There are one thing that I couldn't wrap my head around. It will be great if you could help shed some light on those. Why do we need to run two backward() in the ground truth code snippet? IIUC, DDP's mission is maintaining mathematical equivalence to local training as much as it could (though this is not always possible). If so, shouldn't we keep the local training code intact and let DDP generate the same gradients, instead of changing local training code to mimic DDP's behavior?

ppwwyyxx commented 3 years ago

We run two backward() because the original code in the issue runs two backward(), once on each GPU, on the final all-reduced loss. And this is unrelated to DDP.

DDP works differently because it runs backward on each GPU on the not-reduced local loss, this makes DDP work very similar to local training. And for the same reason DDP doesn't need all-reduce to be differentiable so the issue discussed here doesn't affect DDP.

mrshenli commented 3 years ago

Yep, I am mentioning DDP, because @ronghanghu used DistributedDataParallel to refer to the allreduce implementation. To avoid further confusion, I will call that allreduce instead of DDP. So, to rephrase my question above: should the allreduce implementation try to maintain the same gradients as local training?

For local training, this is the ground truth.

import torch 

x = torch.ones(1).cuda().requires_grad_() 
xx = x * 2 + x * 3
yy = xx * xx
yy.backward()

print("Groundtruth x gradient", x.grad)

# Groundtruth x gradient tensor([50.], device='cuda:0')

IIUC, the allreduce implementation is trying to be a distributed version of DataParallel, i.e., (partially) global loss and accumulate gradients across replicas. (let me know if I misunderstood the intention here.) With nn.all_reduce, gradients on replicas are:

Value after all_reduce: tensor([5.], device='cuda:1', grad_fn=<_AllReduceBackward>)
Value after all_reduce: tensor([5.], device='cuda:0', grad_fn=<_AllReduceBackward>)
gpu=1, grad=tensor([60.], device='cuda:1')
gpu=0, grad=tensor([40.], device='cuda:0')

If we accumulate them, it becomes 100 instead of 50.

ronghanghu commented 3 years ago

Hi @mrshenli, I think there are two things involved here: 1) Splitting a batch across multiple (for data-parallel training) GPUs should have the same effect as forwarding the full batch on a single GPU. This part is already handled correctly by PyTorch's DistributedDataParallel, and has nothing to do in particular with the auto-grad torch.distributed.nn.all_gather or torch.distributed.nn.all_reduce mentioned in this issue. 2) For a neural network that involves auto-grad torch.distributed.nn.all_gather and torch.distributed.nn.all_reduce operation in its forward pass computation (e.g. for contrastive learning or synchronized batch normalization), the backward pass of torch.distributed.nn.all_gather and torch.distributed.nn.all_reduce operations should be implemented to ensure that: training this neural network by splitting the batch across DistributedDataParallel version should produce the effect as training it on a single GPU. Our claim here is that the existing implementation of torch.distributed.nn.all_gather and torch.distributed.nn.all_reduce is already doing the correct auto-grad in its backward pass and does not need any fix, as discussed in the dummy loss example in the comment above https://github.com/pytorch/pytorch/issues/58005#issuecomment-870214108.


And regarding the discussions

should the allreduce implementation try to maintain the same gradients as local training?

Yes, and the current torch.distributed.nn.all_reduce (reduce-sum) implementation is already equivalent to xx = xx_0 + xx_1, or more precisely the operation of (xx_gpu0 = xx_0 + xx_1 and xx_gpu1 = xx_0 + xx_1). It is just that two .backward calls on separate GPUs should be equivalent to two .backward calls on local training, or equivalently one .backward call on the sum of the loss.

An equivalent single-device ground-truth would be

import torch

x_0 = torch.ones(1).cuda().requires_grad_()  # simulate process 0
x_1 = torch.ones(1).cuda().requires_grad_()  # simulate process 1

xx_0 = x_0 * 2
xx_1 = x_1 * 3

xx_gpu0 = xx_0 + xx_1  # simulate all_reduce() output on GPU 0
xx_gpu1 = xx_0 + xx_1  # simulate all_reduce() output on GPU 1
yy_gpu0 = xx_gpu0 * xx_gpu0  # simulate computation on GPU 0 after all_reduce
yy_gpu1 = xx_gpu1 * xx_gpu1  # simulate computation on GPU 1 after all_reduce
(yy_gpu0 + yy_gpu1).backward()  # simulate the combined effect of backward on GPU 0 and GPU 1

print("Groundtruth x_0 gradient", x_0.grad)
print("Groundtruth x_1 gradient", x_1.grad)

which gives

Groundtruth x_0 gradient tensor([40.], device='cuda:0')
Groundtruth x_1 gradient tensor([60.], device='cuda:0')

the allreduce implementation is trying to be a distributed version of DataParallel, i.e., (partially) global loss and accumulate gradients across replicas.

I don't think torch.distributed.nn.all_reduce operation itself has direct connection to data parallel (although data parallel is often used to train a network that involves torch.distributed.nn.all_reduce in its forward pass). The auto-grad implementation of torch.distributed.nn.all_reduce should just be an equivalent to the local operation of (xx_gpu0 = xx_0 + xx_1 and xx_gpu1 = xx_0 + xx_1) on a single device.

Since the backward pass of (xx_gpu0 = xx_0 + xx_1 and xx_gpu1 = xx_0 + xx_1) on a local device is (xx_0.grad = xx_gpu0.grad + xx_gpu1.grad and xx_1.grad = xx_gpu0.grad + xx_gpu1.grad), the backward implementation of torch.distributed.nn.all_reduce should also sum the gradients from all devices (as it currently does).


If we accumulate them, it becomes 100 instead of 50.

Actually, if we are training this neural network (that involves all_reduce) with data parallel (here is where DistributedDataParallel comes in), and if we see x_0 and x_1 as different copies of the same parameter on GPU 0 and GPU 1, then we want to average their gradients, instead of summing them (accumulate) for data parallel training. This is exactly what PyTorch's DistributedDataParallel does (regardless of whether the network involves torch.distributed.nn.all_reduce in its forward pass) -- DistributedDataParallel averages (instead of summing) the gradients across devices in its gradient reduction. So if we average 40 and 60, we get 50.

mrshenli commented 3 years ago

Hey @ronghanghu, thanks for the detailed explanation. Would I be correct if I assume in your use case, nn.all_reduce is always used in conjunction with DistributedDataParallel, so that DDP's design decision of averaging rather than accumulating gradients gives you the correct result? BTW, averaging gradients in DDP does not always preserve mathematical equivalence with local training, it depends on the loss function and it's still an open discussion.

@LinxiFan does the above assumption (i.e., DDP is always involved) applies to your use case?

An equivalent single-device ground-truth would be

This part I didn't follow. Why do you need two model replicas (x_0 and x_1) in single device training? Isn't this changing local training's behavior to mimic distributed training's behavior?

ppwwyyxx commented 3 years ago

all_reduce is a math operation whose gradient is well-defined on its own. And nn.all_reduce has correct gradient regardless of whether DDP averages or sums gradients. I wish we hadn't brought DDP into this topic at all - as I said they are totally independent.

If we accumulate them, it becomes 100 instead of 50.

I don't think @ronghanghu 's explanation on this inconsistency using DDP is a direct answer to it - because what DDP does is unrelated. The reason why the results is 100 instead of 50, is again that .backward() is called twice on the same loss.


I think the code examples so far are too oversimplified and not the most helpful ones. Here is a slightly more complicated one:

import torch
import torch.distributed
import torch.distributed.nn

USE_NN_REDUCE = 0

def main_worker(gpu):
    torch.distributed.init_process_group(
        backend="nccl", init_method="tcp://localhost:12345", world_size=2, rank=gpu
    )
    torch.cuda.set_device(gpu)
    x = torch.ones(1).cuda(device=gpu).requires_grad_()
    xx = (gpu + 2) * x
    if USE_NN_REDUCE:
        xx = torch.distributed.nn.all_reduce(xx)
    else:
        torch.distributed.all_reduce(xx)
    y = (gpu + 2) * xx
    y.backward()
    print(f"gpu={gpu}, grad={x.grad}")

def local():
    x = torch.ones(1).cuda().requires_grad_()
    xx_all_reduce = 2 * x + 3 * x
    y_0 = 2 * xx_all_reduce
    y_1 = 3 * xx_all_reduce
    (y_0 + y_1).backward()
    print(f"truth: grad={x.grad}")

if __name__ == "__main__":
    local()
    torch.multiprocessing.spawn(main_worker, nprocs=2)

This prints:

truth: grad=tensor([25.], device='cuda:0')
gpu=0, grad=tensor([4.], device='cuda:0')
gpu=1, grad=tensor([9.], device='cuda:1')

So it shows that dist.all_reduce produces wrong gradients regardless of whether you average or sum the local gradients. Using nn.all_reduce produces correct gradients.

ronghanghu commented 3 years ago

Thanks @ppwwyyxx! Agreeing that DDP is a totally independent topic and unrelated to nn.all_reduce.

all_reduce is a math operation whose gradient is well-defined on its own.

Exactly! I believe torch.distributed.nn.all_reduce should just be an equivalent to the local operation of (xx_gpu0 = xx_0 + xx_1 and xx_gpu1 = xx_0 + xx_1) on a single device, whose backward pass is (xx_0.grad = xx_gpu0.grad + xx_gpu1.grad and xx_1.grad = xx_gpu0.grad + xx_gpu1.grad). Hence the currently backward implementation of torch.distributed.nn.all_reduce is correct.

mrshenli commented 3 years ago

I think the code examples so far are too oversimplified and not the most helpful ones. Here is a slightly more complicated one:

@ppwwyyxx this helped a lot! Thanks!

all_reduce is a math operation whose gradient is well-defined on its own.

This also totally makes sense.

Would I be correct if I assume we still need to fix for other reduce ops?

import torch
import torch.distributed
import torch.distributed.nn

USE_NN_REDUCE = 1

def main_worker(gpu):
    torch.distributed.init_process_group(
        backend="nccl", init_method="tcp://localhost:12345", world_size=2, rank=gpu
    )
    torch.cuda.set_device(gpu)
    x = torch.ones(1).cuda(device=gpu).requires_grad_()
    xx = (gpu + 2) * x
    if USE_NN_REDUCE:
        xx = torch.distributed.nn.all_reduce(xx, op=torch.distributed.ReduceOp.PRODUCT)
    else:
        torch.distributed.all_reduce(xx, op=torch.distributed.ReduceOp.PRODUCT)
    y = (gpu + 2) * xx
    y.backward()
    print(f"gpu={gpu}, grad={x.grad}")

def local():
    x = torch.ones(1).cuda().requires_grad_()
    xx_all_reduce = 2 * x * 3 * x
    y_0 = 2 * xx_all_reduce
    y_1 = 3 * xx_all_reduce
    (y_0 + y_1).backward()
    print(f"truth: grad={x.grad}")

if __name__ == "__main__":
    local()
    torch.multiprocessing.spawn(main_worker, nprocs=2)
truth: grad=tensor([60.], device='cuda:0')
gpu=0, grad=tensor([12.], device='cuda:0')
gpu=1, grad=tensor([18.], device='cuda:1')
ppwwyyxx commented 3 years ago

The gradient for other allreduce ops is indeed wrong. It is now implemented as the same allreduce op: https://github.com/pytorch/pytorch/blob/ca2702a776541e81945c5410984ba07b2a71ca96/torch/distributed/nn/functional.py#L252-L263

This works for SUM, but not other reduction. Tensorflow defines allreduce gradients here and only supports SUM: https://github.com/tensorflow/tensorflow/blob/f9c651943d442f98d4c7c36839bde70f27a9a135/tensorflow/python/ops/nccl_ops.py#L50-L65

cbalioglu commented 3 years ago

@LinxiFan @ronghanghu @ppwwyyxx @mrshenli please see #63100.

joshlk commented 1 year ago

all_reduce is a math operation whose gradient is well-defined on its own.

Hi @ronghanghu and @ppwwyyxx, do you have any references that show the maths behind this? I have searched the internet and can't find anything.

ezyang commented 1 year ago

it's basically doing a sum, but some of the inputs to the sum are on different nodes

ppwwyyxx commented 1 year ago

To recap & summarize the silent correctness bugs found in this thread:

andmis commented 1 year ago

Just some random commentary:


How should backprop through allreduce work? Consider the following snippet.

import torch
import torch.distributed as dist
import torch.distributed.nn.functional as F
import torch.multiprocessing as mp

def run(rank: int):
  torch.distributed.init_process_group(
    backend="nccl", init_method="tcp://localhost:12345",
    world_size=2, rank=rank,
  )
  x = torch.tensor(1., device=f"cuda:{rank}", requires_grad=True)
  y = x * (rank + 2)
  total = F.all_reduce(y, op=dist.ReduceOp.SUM)
  total.backward()
  print(f"{rank=} {total=} {x.grad=}")

if __name__ == "__main__":
  mp.spawn(run, nprocs=2)

What do you expect the output to be?

Since total = 2*x_0 + 3*x_1 (where x_i represents the instance of x in process i), we might expect x_0.grad == 2 and x_1.grad == 3. In reality we get 4 and 6.

Why might this make sense? The crux of the issue is the meaning of the expression total.backward(). In a non-distributed setting, this line would mean, "total is a scalar; populate every leaf node n in the computation graph of total with the gradient of total WRT n". In the present setting, there are two different, reasonable ways of interpreting the expression total.backward():

  1. total is a scalar, and its value on every worker node is identical. Populate every leaf node n across all worker nodes in the (distributed) computation graph of total with the gradient of total WRT n.
  2. total_i is a scalar, for every worker node i. The values of the total_i need not be the same. For every leaf node n in the (overlapping, distributed) computation graphs of the total_i, populate n.grad with the sum of the gradients of the total_i WRT n.

Option 1 feels closer to how .backward() is commonly used in the non-distributed setting, and the behavior of option 2 (which is what is currently implemented in torch.distributed.nn) is confusing, at least at first.

However, option 2 is easier to implement, imposes fewer constraints (for example, what if just before total.backward() we did total = total * rank? This is allowed by option 2 but not by option 1), and is more consistent with a single-process rewrite of the distributed version of the code (note that in the code snippet above, the expression total.backward() will be executed twice, once per process, so in a single-process rewrite we should call total.backward() twice). Probably for the above reasons, option 2 seems to be what the PyTorch developers have chosen.

As a side note, if you find yourself wishing for the behavior of option 1, and it's true that the tensor you will call .backward() on (probably loss) is indeed a scalar which has the same value on every worker node, then you can just divide your tensor-to-be-differentiated by the number of workers.

ppwwyyxx commented 1 year ago

In the present setting, there are two different, reasonable ways of interpreting the expression total.backward():

(1) total is a scalar, and its value on every worker node is identical. Populate every leaf node n across all worker nodes in the (distributed) computation graph of total with the gradient of total WRT n. (2) total_i is a scalar, for every worker node i. The values of the total_i need not be the same. For every leaf node n in the (overlapping, distributed) computation graphs of the total_i, populate n.grad with the sum of the gradients of the total_i WRT n.

@andmis It doesn't make sense to assume "its value on every worker node is identical". What does that even mean, after all? Every worker can decide to call (or even NOT call at all) backward on any tensor, for any number times. So (1) is not a reasonable interpretation.

joshlk commented 1 year ago

Thanks @andmis for the write-up!

I want to present two points:

  1. That the backwards operation of an all-reduce is a copy (not an all-reduce)
  2. There are two different semantic differences in the use of .backward() in a distributed setting (refining @andmis's argument)

First, what is the backward operation to an all-reduce? Let's consider an all-reduce across 2 nodes:

$$ \hat{y} = \begin{pmatrix} 1 \ 1 \end{pmatrix}\textup{sum}(\hat{x}) = \begin{pmatrix} x_1 + x_2 \ x_1 + x_2 \end{pmatrix} $$

Where $\hat{x}$ and $\hat{y}$ are vectors of inputs and outputs for each node (e.g. $x_1$ is the input on node 1). First we need to calculate $\partial \hat{y} / \partial \hat{x}$:

$$ \begin{aligned} \frac{\partial \hat{y}}{\partial \hat{x}} &= \begin{pmatrix} \frac{\partial y_1 }{\partial x_1} & \frac{\partial y_1 }{\partial x_2} \ \frac{\partial y_2 }{\partial x_1} & \frac{\partial y_2 }{\partial x_2} \end{pmatrix} = \begin{pmatrix} 1 & 1 \ 1 & 1 \end{pmatrix} \end{aligned} $$

We now backpropagate a loss $\hat{l}$ which is a vector of losses on each device, we need to calculate $\partial \hat{l} / \partial \hat{x}$ given input $\partial \hat{l} / \partial \hat{y}$:

$$ \begin{aligned} \frac{\partial \hat{l}}{\partial \hat{x}} &= \frac{\partial \hat{l}}{\partial \hat{y}}\frac{\partial \hat{y}}{\partial \hat{x}} \ &= \begin{pmatrix} \frac{\partial l_1 }{\partial y_1} & \frac{\partial l_1 }{\partial y_2} \ \frac{\partial l_2 }{\partial y_1} & \frac{\partial l_2 }{\partial y_2} \end{pmatrix} \begin{pmatrix} 1 & 1 \ 1 & 1 \end{pmatrix} \ &= \begin{pmatrix} \frac{\partial l_1 }{\partial y_1} + \frac{\partial l_1 }{\partial y_2} & \frac{\partial l_1 }{\partial y_1} + \frac{\partial l_1 }{\partial y_2} \ \frac{\partial l_2 }{\partial y_1} + \frac{\partial l_2 }{\partial y_2} & \frac{\partial l_2 }{\partial y_1} + \frac{\partial l_2 }{\partial y_2} \end{pmatrix} \end{aligned} $$

Above I have considered a vector of losses $\hat{l}$. I think the one common mistake is to consider only a scalar $l$, which is mistakenly simplified to the following:

$$ \begin{aligned} \frac{\partial l}{\partial \hat{x}} &= \left (\frac{\partial l }{\partial y_1} + \frac{\partial l }{\partial y_2} \right ) \begin{pmatrix} 1 & 1 \end{pmatrix} \end{aligned} $$

As $\partial l / \partial y_1$ and $\partial l / \partial y_2$ are different devices, this is interpreted as an all-reduce.

However, even if the values of $l_1$ and $l_2$ equal, they are different functions and so the above simplification is incorrect. $l_1(y_1)$ is a function of $y_1$ and independent of $y_2$, similarly $l_2(y_2)$ is a function of $y_2$ and independent of $y_1$; and so $\partial l_1 / \partial y_2 = 0$ and $\partial l_2 / \partial y_1 = 0$.

Therefore, the above simplifies to:

$$ \frac{\partial \hat{l}}{\partial \hat{x}} = \begin{pmatrix} \frac{\partial l_1 }{\partial y_1} & \frac{\partial l_1 }{\partial y_1} \ \frac{\partial l_2 }{\partial y_2} & \frac{\partial l_2 }{\partial y_2} \end{pmatrix} $$

And so the backward op of an all-reduce is a copy.

Secondly, on the semantics of z.backwards(l) in the distributed setting where z is an arbitrary tensor and l is a gradient to backpropagate of the same size. There are two scenarios (adapted from @andmis):

  1. z represents the same function on all nodes. Populate every leaf node n across all worker nodes in the (distributed) computation graph with $\partial l / \partial x$.
  2. z represents different functions on all worker nodes. For every leaf node n in the (overlapping, distributed) computation graphs populate with the sum of the gradients of all worker nodes $\textup{sum}(\partial \hat{l} / \partial x)$.

Above, I am using the matrix calculus definition of a function. For example, given the vector-valued function $\hat{y} = 2\hat{x}$ which takes an n-valued input $\hat{x}$ and provides an n-valued output $\hat{y}$. A programmer would say this is 1 scalar function $z = 2 a$ called n times with a different input, where $a$ and $z$ are arbitrary inputs and outputs. From a calculus perspective, this is n unique scalar functions $y_i = 2 * x_i$ stacked into a vector.

When using tensor parallel, e.g. Megatron (2019), the function is replicated across all worker nodes for computational convenience; however, they remain the same function - this fits into scenario 1.

When using data parallel, we split a batch across multiple worker nodes. Here each worker node is calculating a different function (different inputs) and so fits into scenario 2.

ppwwyyxx commented 3 months ago

However, even if the values of l1 and l2 equal, they are different functions and so the above simplification is incorrect. l1(y1) is a function of y1 and independent of y2, similarly l2(y2) is a function of y2 and independent of y1; and so βˆ‚l1/βˆ‚y2=0 and βˆ‚l2/βˆ‚y1=0.

If in your example, l1 is computed on y1 and independ of y2; l2 is computed on y2 and independent of y1, then the result of all-reduce (y = y1 + y2) is not used to compute loss at all. Therefore the gradient of all-reduce does not matter in this example. This example doesn't say anything about what is the correct gradient of all-reduce.

joshlk commented 3 months ago

If in your example, l1 is computed on y1 and independ of y2; l2 is computed on y2 and independent of y1, then the result of all-reduce (y = y1 + y2) is not used to compute loss at all. Therefore the gradient of all-reduce does not matter in this example. This example doesn't say anything about what is the correct gradient of all-reduce.

In the example, I define the x's as the inputs of the all-reduce and the y's as the output of the all-reduce. Defined here:

$$ \hat{y} = \begin{pmatrix} 1 \ 1 \end{pmatrix}\textup{sum}(\hat{x}) = \begin{pmatrix} x_1 + x_2 \ x_1 + x_2 \end{pmatrix} $$

So the all-reduce is the sum of the x's and the y's are copies of the result on each device and $y \neq y_1 + y_2$, but $y_1 = x_1 + x_2$