NVIDIA / nccl

Optimized primitives for collective multi-GPU communication
Other
3.24k stars 817 forks source link

will ncclCommAbort fail to abort communications in some cases? #549

Open wangraying opened 3 years ago

wangraying commented 3 years ago

Hi, I started a group of processes to perform allreduce operations. Each process started another thread to call ncclCommAbort at certain timepoint.

It is expected that all processes will eventually exit, but I found that sometimes, the job will accidentally hang. (Rarely happens but it does)

Does it make sense?

Thanks for your help.

sjeaugey commented 3 years ago

It should not hang. Can you see where the process is hung using gdb?

wangraying commented 3 years ago

@sjeaugey

Yes, I checked the hanged process using gdb.

It seems two threads are waiting on clock_gettime.

Thread 1:

(gdb) where 
#0  0x00007ffee4abb6c2 in clock_gettime ()
#1  0x00007f979399ed36 in __GI___clock_gettime (clock_id=4, tp=0x7ffee4a8abb0) at ../sysdeps/unix/clock_gettime.c:115
#2  0x00007f96fee2b70e in ?? () from /usr/lib/x86_64-linux-gnu/libcuda.so.1
#3  0x00007f96fef04837 in ?? () from /usr/lib/x86_64-linux-gnu/libcuda.so.1
#4  0x00007f96fedccb6c in ?? () from /usr/lib/x86_64-linux-gnu/libcuda.so.1
#5  0x00007f96fed02f4f in ?? () from /usr/lib/x86_64-linux-gnu/libcuda.so.1
#6  0x00007f96fee87259 in cuMemFree_v2 () from /usr/lib/x86_64-linux-gnu/libcuda.so.1
#7  0x00007f97160362c0 in cudart::cudaApiFree(void*) () from /home/wangrui08/bagua-sys/bagua-core/python/bagua_core/.data/lib/libnccl.so
#8  0x00007f97160683e1 in cudaFree () from /home/wangrui08/bagua-sys/bagua-core/python/bagua_core/.data/lib/libnccl.so
#9  0x00007f9715fb2b06 in commFree (comm=0x55d5d2790840) at init.cc:204
#10 commDestroy (comm=comm@entry=0x55d5d2790840) at init.cc:990
#11 0x00007f9715fb9a15 in ncclCommAbort (comm=0x55d5d2790840) at init.cc:1026

Thread 7

(gdb) bt  
#0  0x00007ffee4abb6c2 in clock_gettime ()
#1  0x00007f979399ed36 in __GI___clock_gettime (clock_id=4, tp=0x7f96ddffcb90) at ../sysdeps/unix/clock_gettime.c:115
#2  0x00007f96fee2b70e in ?? () from /usr/lib/x86_64-linux-gnu/libcuda.so.1
#3  0x00007f96fef04837 in ?? () from /usr/lib/x86_64-linux-gnu/libcuda.so.1
#4  0x00007f96fee1355c in ?? () from /usr/lib/x86_64-linux-gnu/libcuda.so.1
#5  0x00007f96fee136b9 in ?? () from /usr/lib/x86_64-linux-gnu/libcuda.so.1
#6  0x00007f96fed122dd in ?? () from /usr/lib/x86_64-linux-gnu/libcuda.so.1
#7  0x00007f96feeafa39 in cuStreamSynchronize () from /usr/lib/x86_64-linux-gnu/libcuda.so.1
#8  0x00007f977b3b2060 in ?? () from /root/miniconda/envs/py37/lib/python3.7/site-packages/torch/lib/../../../../libcudart.so.10.1
#9  0x00007f977b3f38ed in cudaStreamSynchronize () from /root/miniconda/envs/py37/lib/python3.7/site-packages/torch/lib/../../../../libcudart.so.10.1

In my program, I called ncclCommAbort in one thread, and called cudaStreamSynchronize in another thread to synchronize the stream on the nccl communicator.

I also checked nvidia-smi, showing that there is one gpu is of 100% usage.

+-----------------------------------------------------------------------------+
| NVIDIA-SMI 440.64.00    Driver Version: 440.64.00    CUDA Version: 10.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla V100-SXM2...  On   | 00000000:1A:00.0 Off |                    0 |
| N/A   39C    P0    44W / 300W |    932MiB / 32510MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   1  Tesla V100-SXM2...  On   | 00000000:1B:00.0 Off |                    0 |
| N/A   38C    P0    42W / 300W |     12MiB / 32510MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   2  Tesla V100-SXM2...  On   | 00000000:3D:00.0 Off |                    0 |
| N/A   37C    P0    44W / 300W |     12MiB / 32510MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   3  Tesla V100-SXM2...  On   | 00000000:3E:00.0 Off |                    0 |
| N/A   39C    P0    44W / 300W |     12MiB / 32510MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   4  Tesla V100-SXM2...  On   | 00000000:88:00.0 Off |                    0 |
| N/A   38C    P0    42W / 300W |    252MiB / 32510MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   5  Tesla V100-SXM2...  On   | 00000000:89:00.0 Off |                    0 |
| N/A   39C    P0    45W / 300W |    592MiB / 32510MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   6  Tesla V100-SXM2...  On   | 00000000:B1:00.0 Off |                    0 |
| N/A   41C    P0    44W / 300W |    448MiB / 32510MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   7  Tesla V100-SXM2...  On   | 00000000:B2:00.0 Off |                    0 |
| N/A   50C    P0    68W / 300W |   3389MiB / 32510MiB |    100%      Default |
+-------------------------------+----------------------+----------------------+
sjeaugey commented 3 years ago

Thanks for the trace. Indeed, the first thing ncclCommAbort does is set comm->abortFlag=1, so that all CUDA kernels should exit.

We might have some case we missed which would block indefinitely without checking for the abortFlag.

You mentioned it happens rarely, but in case you have a method to reproduce the issue in a reasonable amount of time, it would be useful to check whether it is tied to a specific protocol or not, setting NCCL_PROTO=LL, NCCL_PROTO=LL128, or NCCL_PROTO=SIMPLE.

wangraying commented 3 years ago

Thanks for your reply. @sjeaugey

I have made a sample code which could reproduce the hanging problem above.

I also tested using different NCCL protocols, it seems that using LL128 may cause the program to hang. The results are:

My environment setups are:

The sample code are as follows:

from __future__ import print_function
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
import logging
import os
import threading
import time
import cupy
from cupy.cuda import nccl
import numpy

flatten = torch._utils._flatten_dense_tensors
unflatten = torch._utils._unflatten_dense_tensors

global_comm = None

def init_global_communicator(rank, size):
    import torch.distributed as c10d
    default_store = c10d.distributed_c10d._get_default_store()

    key = 'unique_id'
    if rank == 0:
        id = nccl.get_unique_id()
        idstr = numpy.array(id).tostring()
        default_store.set(key, idstr)
    else:
        idstr = default_store.get(key)

    comm_id = tuple(numpy.fromstring(idstr, dtype=int))

    global global_comm
    global_comm = nccl.NcclCommunicator(size, comm_id, rank)

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

def train(args, model, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.cuda(), target.cuda()
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            logging.info(
                "Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
                    epoch,
                    batch_idx * len(data),
                    len(train_loader.dataset),
                    100.0 * batch_idx / len(train_loader),
                    loss.item(),
                )
            )

def test(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.cuda(), target.cuda()
            output = model(data)
            test_loss += F.nll_loss(
                output, target, reduction="sum"
                ).item()  # sum up batch loss
            pred = output.argmax(
                dim=1, keepdim=True
            )  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    logging.info(
        "\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format(
            test_loss,
            correct,
            len(test_loader.dataset),
            100.0 * correct / len(test_loader.dataset),
        )
    )

def main():
    # Training settings
    parser = argparse.ArgumentParser(description="PyTorch MNIST Example")
    parser.add_argument("--rank", default=0, type=int)
    parser.add_argument("--local_rank", default=0, type=int)
    parser.add_argument(
        "--batch-size",
        type=int,
        default=64,
        metavar="N",
        help="input batch size for training (default: 64)",
    )
    parser.add_argument(
        "--test-batch-size",
        type=int,
        default=1000,
        metavar="N",
        help="input batch size for testing (default: 1000)",
    )
    parser.add_argument(
        "--epochs",
        type=int,
        default=14,
        metavar="N",
        help="number of epochs to train (default: 14)",
    )
    parser.add_argument(
        "--lr",
        type=float,
        default=1.0,
        metavar="LR",
        help="learning rate (default: 1.0)",
    )
    parser.add_argument(
        "--gamma",
        type=float,
        default=0.7,
        metavar="M",
        help="Learning rate step gamma (default: 0.7)",
    )
    parser.add_argument(
        "--seed", type=int, default=1, metavar="S", help="random seed (default: 1)"
    )
    parser.add_argument(
        "--log-interval",
        type=int,
        default=10,
        metavar="N",
        help="how many batches to wait before logging training status",
    )
    parser.add_argument(
        "--save-model",
        action="store_true",
        default=False,
        help="For Saving the current Model",
    )

    args = parser.parse_args()

    # init dist training
    args.world_size = 1
    args.distributed = False
    if "WORLD_SIZE" in os.environ:
        args.world_size = int(os.environ["WORLD_SIZE"])
        args.distributed = args.world_size > 1
        args.rank = int(os.environ["RANK"])

    if args.distributed:
       torch.cuda.set_device(args.local_rank)
       torch.distributed.init_process_group("nccl")
       init_global_communicator(args.rank, args.world_size)

       global global_comm
       print(f"init communicator ok, rank: {global_comm.rank_id()}, size: {global_comm.size()}, dev: {global_comm.device_id()}")

    stop_event = threading.Event()
    torch.manual_seed(args.seed)

    logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
    if args.rank == 0:
        logging.getLogger().setLevel(logging.INFO)

    train_kwargs = {"batch_size": args.batch_size}
    test_kwargs = {"batch_size": args.test_batch_size}
    cuda_kwargs = {"num_workers": 1, "pin_memory": True, "shuffle": True}
    train_kwargs.update(cuda_kwargs)
    test_kwargs.update(cuda_kwargs)

    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
    )

    if args.local_rank == 0:
        dataset1 = datasets.MNIST(
            "../data", train=True, download=True, transform=transform
        )
        torch.distributed.barrier()
    else:
        torch.distributed.barrier()
        dataset1 = datasets.MNIST(
            "../data", train=True, download=True, transform=transform
        )

    dataset2 = datasets.MNIST("../data", train=False, transform=transform)
    train_sampler = torch.utils.data.distributed.DistributedSampler(dataset1)
    train_kwargs.update(
        {
            "sampler": train_sampler,
            "batch_size": args.batch_size // args.world_size,
            "shuffle": False,
        }
    )
    train_loader = torch.utils.data.DataLoader(dataset1, **train_kwargs)
    test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)

    model = Net().cuda()
    optimizer = optim.Adadelta(model.parameters(), lr=args.lr)

    current_stream = torch.cuda.current_stream()
    start_background_communication(model, stop_event, args.local_rank, args.world_size, current_stream)
    scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
    for epoch in range(1, args.epochs + 1):
        train(args, model, train_loader, optimizer, epoch)
        test(model, test_loader)
        scheduler.step()

    barrier_model(model, stop_event)

    if args.save_model and args.rank == 0:
        torch.save(model.state_dict(), "mnist_cnn.pt")

def barrier_model(model, event):
    global global_comm

    event.set()
    time.sleep(5)
    print("try to abort")
    global_comm.abort()

def start_background_communication(model, event, device_id, world_size, current_stream):
    threading.Thread(target=average_model, args=(model, event, device_id, world_size, current_stream)).start()

def average_model(model, stop_event, device_id, world_size, current_stream):
    global global_comm

    torch.cuda.set_device(device_id)
    cupy.cuda.Device(device_id).use()
    comm_stream = torch.cuda.Stream()

    cuda_event = torch.cuda.Event()

    step = 0
    while not stop_event.is_set():
        comm_stream.synchronize()

        params = [param.data for param in model.parameters()]
        flatten_tensor = flatten(params)

        # wait current stream to complete
        current_stream.record_event(cuda_event)
        comm_stream.wait_event(cuda_event)

        if stop_event.is_set():
            print("aborted")
            return

        print(f"#{device_id} perform allreduce, step={step}")
        global_comm.allReduce(flatten_tensor.data_ptr(), flatten_tensor.data_ptr(), torch.numel(flatten_tensor),
            nccl.NCCL_FLOAT32, 4, #nccl.NCCL_AVG,
            comm_stream.cuda_stream
        )

        step += 1
        time.sleep(500 / 1000)

if __name__ == "__main__":
    main()

The command to run is:

python3 -m torch.distributed.launch --nproc_per_node=8  mnist.py

Thanks for your help.

sjeaugey commented 3 years ago

Thanks. This is still quite a low occurrence to definitively prove this is related to LL128, but still a good clue.

wangraying commented 3 years ago

Update @sjeaugey

After I removed the time.sleep(5) in barrier_model function, it has a higher occurrence to hang.

I tested three times, the results are: 1 hang out of 6 runs, 1 hang out of 2 runs, and 1 hang out of 3 runs.

I also changed the running command to python3 -m torch.distributed.launch --nproc_per_node=8 mnist.py --epochs 1 in these tests, which can shorten the time to reproduce.

Thanks.