NVIDIA / nccl

Optimized primitives for collective multi-GPU communication
Other
3.27k stars 829 forks source link

[Bug] For AllReduce communications with a shape mismatch, some cases will hang while others will not. #1417

Open YanjieGao opened 3 months ago

YanjieGao commented 3 months ago

Hi, I have a similar problem to the one described in https://github.com/NVIDIA/nccl/issues/1394.

For allreduce communication with a shape mismatch (PyTorch with NCCL backend), some shapes may cause the process to hang, while others may not, with no predefined assumptions for synchronization semantics and no mention in the documentation. In cases of shape mismatch, incorrect results may be calculated at times. For example, if the shape of rank 0 is 409600 and the shape of ranks other than 0 is 409598, the last two results of rank 0 are 6.0 6.0 in test case 3.

Experimental results:

Test 1 log:

rank 0 shape is 409600, rank != 0 shape is 32768000 will hang

    # Rank 7 tensor before allreduce: torch.Size([32768000]) 1.0
    # Rank 6 tensor before allreduce: torch.Size([32768000]) 1.0
    # Rank 4 tensor before allreduce: torch.Size([32768000]) 1.0
    # Rank 0 tensor before allreduce: torch.Size([409600]) 1.0
    # Rank 5 tensor before allreduce: torch.Size([32768000]) 1.0
    # Rank 3 tensor before allreduce: torch.Size([32768000]) 1.0
    # Rank 2 tensor before allreduce: torch.Size([32768000]) 1.0
    # Rank 1 tensor before allreduce: torch.Size([32768000]) 1.0
    # Rank 0 tensor after allreduce: torch.Size([409600]) 8.0 8.0 8.0 8.0

Test 2 log:

rank 0 shape is 409600, rank != 0 shape is 409600 will finish normally

    # ... 
    # Rank 5 tensor after allreduce: torch.Size([409600]) 8.0 8.0 8.0 8.0
    # Rank 6 tensor after allreduce: torch.Size([409600]) 8.0 8.0 8.0 8.0
    # Rank 4 tensor after allreduce: torch.Size([409600]) 8.0 8.0 8.0 8.0
    # Rank 3 tensor after allreduce: torch.Size([409600]) 8.0 8.0 8.0 8.0
    # Rank 2 tensor after allreduce: torch.Size([409600]) 8.0 8.0 8.0 8.0
    # Rank 7 tensor after allreduce: torch.Size([409600]) 8.0 8.0 8.0 8.0
    # Rank 1 tensor after allreduce: torch.Size([409600]) 8.0 8.0 8.0 8.0
    # Rank 0 tensor after allreduce: torch.Size([409600]) 8.0 8.0 8.0 8.0

Test 3 log:

rank 0 shape is 409600, rank != 0 shape is 409598 will finish

    # ...
    # Rank 4 tensor after allreduce: torch.Size([409598]) 8.0 8.0 8.0 8.0
    # Rank 6 tensor after allreduce: torch.Size([409598]) 8.0 8.0 8.0 8.0
    # Rank 3 tensor after allreduce: torch.Size([409598]) 8.0 8.0 8.0 8.0
    # Rank 5 tensor after allreduce: torch.Size([409598]) 8.0 8.0 8.0 8.0
    # Rank 2 tensor after allreduce: torch.Size([409598]) 8.0 8.0 8.0 8.0
    # Rank 1 tensor after allreduce: torch.Size([409598]) 8.0 8.0 8.0 8.0
    # Rank 0 tensor after allreduce: torch.Size([409600]) 8.0 8.0 6.0 6.0
    # Rank 7 tensor after allreduce: torch.Size([409598]) 8.0 8.0 8.0 8.0

Test 4 log:

rank 0 shape is 409600, rank != 0 shape is 32768 will finish

    # ...
    # Rank 5 tensor after allreduce: torch.Size([32768]) 8.0 8.0 8.0 8.0
    # Rank 4 tensor after allreduce: torch.Size([32768]) 8.0 8.0 8.0 8.0
    # Rank 7 tensor after allreduce: torch.Size([32768]) 8.0 8.0 8.0 8.0
    # Rank 3 tensor after allreduce: torch.Size([32768]) 8.0 8.0 8.0 8.0
    # Rank 2 tensor after allreduce: torch.Size([32768]) 8.0 8.0 8.0 8.0
    # Rank 1 tensor after allreduce: torch.Size([32768]) 8.0 8.0 8.0 8.0
    # Rank 6 tensor after allreduce: torch.Size([32768]) 8.0 8.0 8.0 8.0
    # Rank 0 tensor after allreduce: torch.Size([409600]) 8.0 0.0 0.0 0.0

Test code:

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import os

def setup(rank, world_size):
    os.environ["MASTER_ADDR"] = "127.0.0.1"
    os.environ["MASTER_PORT"] = "29501"
    dist.init_process_group(backend="nccl", rank=rank, world_size=world_size)

def allreduce_test(rank, world_size):
    setup(rank, world_size)
    if rank == 0:
        tensor_size = 409600
    else:
        tensor_size = 409600

    # Create a tensor of size tensor_size, initialized to ones, and placed on the specified rank
    tensor = torch.ones(tensor_size).to(rank)

    print(f"Rank {rank} tensor before allreduce: {tensor.shape} {tensor[0].item()}")

    # Perform all-reduce operation, summing tensors across all ranks
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=dist.group.WORLD)

    print(f"Rank {rank} tensor after allreduce: {tensor.shape} {tensor[0].item()} {tensor[-3].item()} {tensor[-2].item()} {tensor[-1].item()}")

if __name__ == "__main__":
    world_size = torch.cuda.device_count() 
    mp.spawn(allreduce_test, args=(world_size,), nprocs=world_size, join=True)
sjeaugey commented 3 months ago

Indeed, there is no mention in the documentation that sizes have to match between ranks. That's obvious for anyone who's used MPI, but it's an oversight.

It's clear in the point-to-point communication:

Any point-to-point communication needs two NCCL calls : a call to ncclSend() on one rank and a corresponding ncclRecv() on the other rank, with the same count and data type.

And there is a hint at that rule in the MPI section:

MPI allows for different send and receive counts and types, as long as sendcount*sizeof(sendtype) == recvcount*sizeof(recvtype). NCCL does not allow that, defining a single count and a single data-type.

But indeed, it's not clearly mentioned. We'll fix that in the next documentation update.

YanjieGao commented 3 months ago

Hi sjeaugey, Thanks for your reply. In addition, could you explain some rules for collective communication such as for allreduce, under what shape will it hang and when will it not? In addition, for some illegal cases in the test, if it is allowed not to hang in the case of shape mismatch, the calculated tensor tail result will be incorrect. It seems that an assert or hang should be given to prevent the user from getting the wrong numerical result without realizing it?

sjeaugey commented 3 months ago

In addition, could you explain some rules for collective communication such as for allreduce, under what shape will it hang and when will it not?

It may hang as soon as the sizes differ. In some cases we'll end up with the same number of network transfers and it may work, but some data will be incorrect given there wasn't an input value for all ranks at that offset.

In addition, for some illegal cases in the test, if it is allowed not to hang in the case of shape mismatch, the calculated tensor tail result will be incorrect. It seems that an assert or hang should be given to prevent the user from getting the wrong numerical result without realizing it?

Doing so may require additional communication between ranks, which would impact performance negatively. When we detect something wrong we print it, but we can't always detect it without an extra cost.

YanjieGao commented 3 months ago

Hi sjeaugey, thank you for your feedback. It would be very helpful for users if these assumptions could be stated in the documentation.

sjeaugey commented 3 months ago

Absolutely. I fixed the doc, so it should be updated in the next release.