Mmiglio / DaskTorch

Pytorch with dask
5 stars 1 forks source link

dask-distributed worker cannot init_process_group #1

Open lengoanhcat opened 4 years ago

lengoanhcat commented 4 years ago

Hi,

First of all thanks for sharing the framework.

I am trying to run your example 'python train.py' unfortunately without successes ( Dask 2.15.0 and torch 1.4.0+cu100'.

When executing python triain.py, dask-scheduler and dask-workers are successfully created. However, dask-workers seem to stop at dist.init_process_group forever.

Any idea why it happens ? Really appreciate your help.

Regards,

Cat Le

Mmiglio commented 4 years ago

Hi,

First of all I need to say that this is a simple example I created a long time ago and tested it only on my local machine. By looking at it now there are a few things that I should change to make it work better. Fixing it is something that I want to do, maybe in the next days I will!

Anyway, I just tested it and it is working! Are you using it on your local machine or in a cluster? Because in the function run I'm setting the env variables that pytorch will use to create the process group. For example by setting the master address with os.environ['MASTER_ADDR'] = "127.0.0.1". Of course this works only if all the workers are in the same machine, otherwise you have to put the IP of the process with rank 0.

lengoanhcat commented 4 years ago

Thanks @Mmiglio for a quick reply,

My final aim is to run a parallel training on clusters of servers.

However, I begin my test with running train.py on one server first 'localhost'. On my localhost, each worker seems to stuck at 'Waiting other workers ... ''' - a printed line before init_group_process.

Each worker only consumes ~ 4% of a CPU capacity, so I assume that it is just hanging out without proceeding to other parts of the code.

image

I tried to run without dist.init_process_group() and it runs fine. Therefore, I speculate there might be some integrating problem between nn.DistributedDataParallelCPU and dask at least on my clusters.

Mmiglio commented 4 years ago

Hi,

sounds like there is a problem with the initialization of the process group. Workers are hanging because they cannot communicate with the master (rank 0).

Could you try to create a process group without Dask? you can do it by running

import argparse
import torch

def run(args):
    tensor = torch.zeros(1)
    if args.rank == 0:
        tensor += 1
        # Send the tensor to process 1
        torch.distributed.send(tensor=tensor, dst=1)
    else:
        # Receive tensor from process 0
        torch.distributed.recv(tensor=tensor, src=0)
    print('Rank {} has data {}'.format(args.rank, tensor[0]))

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--backend',
        type=str,
        default='gloo',
        help='Name of the backend to use.'
    )
    parser.add_argument(
        '-i',
        '--init-method',
        type=str,
        default='tcp://127.0.0.1:23456',
        help='URL specifying how to initialize the package.'
    )

    parser.add_argument(
        '-s',
        '--world-size',
        type=int,
        default=1,
        help='Number of processes participating in the job.'
    )

    parser.add_argument(
        '-r',
        '--rank',
        type=int,
        default=0,
        help='Rank of the current process.'
    )

    args = parser.parse_args()

    if args.world_size > 1:
        torch.distributed.init_process_group(
            backend=args.backend,
            init_method=args.init_method,
            world_size=args.world_size,
            rank=args.rank,
        )

    run(args)

on two shells in your local machine with

python test.py --init-method tcp://127.0.0.1:23456 --world-size 2 --rank 0
python test.py --init-method tcp://127.0.0.1:23456 --world-size 2 --rank 1

In my VMs I need to set manually the network interface for gloo socket with export GLOO_SOCKET_IFNAME=eth0 before running the previous commands (remember to put it on both shells). If you run it on a single machine there shouldn't be any problem!

Let me know if it works.

lengoanhcat commented 4 years ago

image

It works for me as distributed rank 0 successfully sends 1.0 rank 1. I tried to omit GLOO_SOCKET_IFNAME, it still works.

Mmiglio commented 4 years ago

Have you already try to set the same GLOO_SOCKET _IFNAME inside the function run?

lengoanhcat commented 4 years ago

Yeap, I did. The result is still the same. As it does not show any log, I have no idea how to fix. ^^

Mmiglio commented 4 years ago

I'm running out of ideas :/ Did you try to replace the initialization method? replace the current initialization with

os.environ['GLOO_SOCKET_IFNAME'] = "eth0"
torch.distributed.init_process_group(
        backend="gloo",
        init_method="tcp://127.0.0.1:23456",
        world_size=size,
        rank=rank,
    )

If for some weird reason the initialization works but it crashes at the end of the training, try putting torch.distributed.barrier() before the return.

lengoanhcat commented 4 years ago

Hi and sorry for late reply,

I am testing on a local computer, it works well. However, it stills hangs while testing on a cluster. I am trying to figure out why. Will update here if I find out the reason.

Mmiglio commented 4 years ago

Let me know if you find something, maybe I will have the time to finally fix it.