pytorch / pytorch

Tensors and Dynamic neural networks in Python with strong GPU acceleration
https://pytorch.org
Other
83.67k stars 22.58k forks source link

Make streams used for NCCL operations configurable #67158

Open wjuni opened 3 years ago

wjuni commented 3 years ago

🚀 Feature

Make streams used for NCCL operations configurable

Motivation

I've noticed that PyTorch distributed module has introduced P2P send and receive functionality via NCCL (which is listed as "not supported" yet on the document though). However, I found possible cases where NCCL hangs when two nodes exchange tensor with NCCL P2P send and recv. Here is (possibly) a minimal working example.

import torch
import torch.distributed as dist
from argparse import ArgumentParser

if __name__ == "__main__":
    parser = ArgumentParser()
    parser.add_argument("--rank", type=int)
    args = parser.parse_args()

    local_rank, remote_rank = args.rank, 1 - args.rank
    device = torch.device('cuda', local_rank)
    torch.cuda.set_device(device)

    dist.init_process_group(backend="nccl", init_method='tcp://127.0.0.1:30001', rank=local_rank, world_size=2)
    dist.barrier()
    print("Ready")

    tensor_to_send = torch.empty((1000000,), device=device)
    tensor_to_recv = torch.empty((1000000,), device=device)

    # exchange tensors between two nodes
    dist.send(tensor_to_send, remote_rank)
    dist.recv(tensor_to_recv, remote_rank)
    torch.cuda.synchronize() # never returns

    print("Done")

I believe running two processes with command arguments --rank 0 and --rank 1 should not raise any problems. However, they will likely end up hanging at torch.cuda.synchronize() with 100% GPU utilization (if not, please try with larger tensors).

This is because dist.send and dist.recv, which internally call ncclSend and ncclRecv, cause deadlock. ncclSend blocks until the remote rank calls ncclRecv, and vice versa (see https://github.com/NVIDIA/nccl/issues/584). Thus, when node 0 and node 1 issue dist.send at the same time, and they will not be able to proceed to dist.recv. This makes issuing successive calls to dist.send and dist.recv very fragile to deadlocks.

To handle this, the NCCL documentation recommends grouping ncclSend and ncclRecv operations with with ncclGroupStart and ncclGroupEnd (dist.P2POp and dist.batch_isend_irecv in PyTorch). However, this does not always work for all cases, especially when two operations cannot be grouped. For example, when two nodes randomly send and receive tensors to each other, two nodes may simultaneously call send and fall into a deadlock as there is no locking mechanism here.

In my project, there are multiple nodes exchanging tensors with each other, and any node can send or receive a tensor. The sender transmits a tensor and corresponding metadata to the receiver as needed. To achieve this, metadata containing the size of the tensor is delivered to the receiver out-of-band via RPC. The receiver calls dist.recv upon receiving the metadata, and the receiver calls dist.send to send the tensor. Here, when two nodes call dist.send simultaneously by chance, this causes GPUs hang and it will never return from synchronize().

Pitch

IMHO, the easiest way to resolve this is using different CUDA streams for the dist.send and dist.recv. Unfortunately, it seems that there is no viable way for users to specify a specific stream for the NCCL operations. PyTorch distributed uses a dedicated stream for each NCCL connection (https://github.com/pytorch/pytorch/issues/60511), and with torch.stream(stream) does not work as in other CUDA operations.

I would like to have a context manager that can configure a stream used for NCCL operations something like below:

# At the sender side
stream_send = dist.NCCLStream(remote_rank) # Create stream for NCCL operation
with dist.use_stream(stream_send): 
    # this context should not affect other CUDA operations
    dist.send(tensor_to_send, remote_rank)
torch.cuda.synchronize()

# At the receiver side
stream_recv = dist.NCCLStream(remote_rank) # Create stream for NCCL operation
with dist.use_stream(stream_recv): 
    # this context should not affect other CUDA operations
    dist.recv(tensor_to_recv, remote_rank)
torch.cuda.synchronize()

so that dist.send and dist.recv do not block each other.

Alternatives

Well, we can implement some hacky locking mechanisms to prevent two nodes from calling dist.send at the same time. However, this would come with some overhead, and doesn't seem to be a good solution...

Additional context

I'll be glad to know if other workarounds can be applied to my problem.

I'd like to work on this and submit a PR if there is no workaround to resolve this problem.

cc @pietern @mrshenli @pritamdamania87 @zhaojuanmao @satgera @rohan-varma @gqchen @aazzolini @osalpekar @jiayisuse @SciPioneer @H-Huang

AndyBug0 commented 1 year ago

Is there any workarounds to solve this problem? I met the same problem..

wjuni commented 1 year ago

@AndyBug0 I'm not sure if there is a new way to solve this problem on the current version, but I ended up implementing the hacky RPC-based locking mechanism at that time.

https://github.com/kaist-ina/TSPipe/blob/main/tspipe/communicator.py#L606

AndyBug0 commented 1 year ago

I profile my program and find out that broadcast and scatter operations are on different cuda stream, it seems that torch support nccl on different cuda stream, but I don't know why. image

Edenzzzz commented 5 months ago

This is still not manually configurable. Looking forward to a fix. cc @pietern @mrshenli @pritamdamania87 @zhaojuanmao @satgera @rohan-varma @gqchen @aazzolini @osalpekar @jiayisuse

jcbjcbjc commented 5 months ago

@wjuni Hi, I indeed face the similar problem and your repository give me a lot of help. But I want to ask if cuda api(computation) can be executed concurrently with communication op(sender and receiver) in your project TSPipe. I need to execute cuda api and execute communication concurrently. Thank you

wjuni commented 5 months ago

@jcbjcbjc My project does not require computation and communication to be fully concurrent, but I believe we can expect some degree of concurrency between computation and communication as the computation and NCCL kernel are launched in different streams.

jcbjcbjc commented 4 months ago

@wjuni Thanks for your reply. In my project, I have verified that computation and communication can be fully concurrent . Another, I observe that hacky RPC-based locking mechanism in TSPipe is to avoid exchanging data concurrently between two GPU(send then receive) to make it serialized。Is it right?

CalvinXKY commented 2 months ago

There is a way to use more process_groups directly that might be working. I've tested it with code snippet as follows:

# torch version: 2.1.1
import torch
import torch.distributed as dist
from datetime import timedelta as timedelta
import torch.multiprocessing as mp
from torch.profiler import profile, ProfilerActivity

def example(rank, world_size):
    store1 = dist.TCPStore(host_name="localhost", port=12332, world_size=world_size,
                          is_master=True if rank == 0 else False,
                          timeout=timedelta(seconds=30))
    store2 = dist.TCPStore(host_name="localhost", port=12333, world_size=world_size,
                           is_master=True if rank == 0 else False,
                           timeout=timedelta(seconds=30))

    pg1 = dist.ProcessGroupNCCL(store1, rank, world_size)
    pg2 = dist.ProcessGroupNCCL(store2, rank, world_size)

    with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
                 record_shapes=True,
                 with_stack=True) as prof:

        tensor1 = (torch.ones([100000, 10000]) * rank).to(rank)
        tensor2 = (torch.ones([100000, 10000]) * rank).to(rank)
        # 'lazy initialization' in nccl that is actually synchronous.
        # ref issue: https://github.com/pytorch/pytorch/issues/122597
        # first round should like this:
        if rank % 2 == 0:
            pg1.send([tensor1], (rank + 1) % world_size, 1)
            pg2.recv([tensor2], (rank - 1) % world_size, 0)
        else:
            pg1.recv([tensor1], (rank - 1) % world_size, 1)
            pg2.send([tensor2], (rank + 1) % world_size, 0)

        pg1.barrier()
        pg2.barrier()
        for i in range(3):
            if rank % 2 == 0:
                pg1.send([tensor1], (rank + 1) % world_size, 1)
                pg2.recv([tensor2], (rank - 1) % world_size, 0)
            else:
                pg2.send([tensor2], (rank + 1) % world_size, 0)
                pg1.recv([tensor1], (rank - 1) % world_size, 1)
        pg1.barrier()
        pg2.barrier()

    if rank == 0:
        prof.export_chrome_trace("torch_profiler_trace.json")
        print(tensor1.shape)

def main():
    world_size = 8
    mp.spawn(example,
             args=(world_size,),
             nprocs=world_size,
             join=True)

if __name__ == "__main__":
    main()

The profiling shows that the diff p2p are on two cuda streams and there is overlap here.

image

chenhongyu2048 commented 2 months ago

@wjuni Thanks for your reply. In my project, I have verified that computation and communication can be fully concurrent . Another, I observe that hacky RPC-based locking mechanism in TSPipe is to avoid exchanging data concurrently between two GPU(send then receive) to make it serialized。Is it right?

Hello, I would like to ask you how to achieve the overlap of computing and communication? I've tried multi-cudastreams in pytorch so far, but it doesn't work well. Hope to hear from you.

chenhongyu2048 commented 2 months ago

有一种方法可以直接使用更多 process_groups,这种方法可能有效。我使用以下代码片段对其进行了测试:

# torch version: 2.1.1
import torch
import torch.distributed as dist
from datetime import timedelta as timedelta
import torch.multiprocessing as mp
from torch.profiler import profile, ProfilerActivity

def example(rank, world_size):
    store1 = dist.TCPStore(host_name="localhost", port=12332, world_size=world_size,
                          is_master=True if rank == 0 else False,
                          timeout=timedelta(seconds=30))
    store2 = dist.TCPStore(host_name="localhost", port=12333, world_size=world_size,
                           is_master=True if rank == 0 else False,
                           timeout=timedelta(seconds=30))

    pg1 = dist.ProcessGroupNCCL(store1, rank, world_size)
    pg2 = dist.ProcessGroupNCCL(store2, rank, world_size)

    with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
                 record_shapes=True,
                 with_stack=True) as prof:

        tensor1 = (torch.ones([100000, 10000]) * rank).to(rank)
        tensor2 = (torch.ones([100000, 10000]) * rank).to(rank)
        # 'lazy initialization' in nccl that is actually synchronous.
        # ref issue: https://github.com/pytorch/pytorch/issues/122597
        # first round should like this:
        if rank % 2 == 0:
            pg1.send([tensor1], (rank + 1) % world_size, 1)
            pg2.recv([tensor2], (rank - 1) % world_size, 0)
        else:
            pg1.recv([tensor1], (rank - 1) % world_size, 1)
            pg2.send([tensor2], (rank + 1) % world_size, 0)

        pg1.barrier()
        pg2.barrier()
        for i in range(3):
            if rank % 2 == 0:
                pg1.send([tensor1], (rank + 1) % world_size, 1)
                pg2.recv([tensor2], (rank - 1) % world_size, 0)
            else:
                pg2.send([tensor2], (rank + 1) % world_size, 0)
                pg1.recv([tensor1], (rank - 1) % world_size, 1)
        pg1.barrier()
        pg2.barrier()

    if rank == 0:
        prof.export_chrome_trace("torch_profiler_trace.json")
        print(tensor1.shape)

def main():
    world_size = 8
    mp.spawn(example,
             args=(world_size,),
             nprocs=world_size,
             join=True)

if __name__ == "__main__":
    main()

分析显示,不同的 p2p 位于两个 cuda 流上,并且这里有重叠。

图像

Hello, I test the similar code in cuda and also find a overlapping. But perhaps this overlap will cause a performance loss in these comm kernels?


Ok, I got an answer. Although the overlap cause performance loss in both send/recv kernel compared to a single one, the overlap make up part of it. And using multiple processGroup helps me a lot especially when I have several iteration of send+recv.