ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.98k stars 5.77k forks source link

[Core] [Bug] Failed to register worker to Raylet for single node, multi-GPU #21226

Open addisonklinke opened 2 years ago

addisonklinke commented 2 years ago

Search before asking

Ray Component

Ray Tune

What happened + What you expected to happen

I am trying to run the official tutorial for PyTorch Lightning. It works fine one a single GPU, but fails when the requested resources per trial are more than one GPU

# Normal behavior with a single GPU
$ python tutorial.py --limit_batches 1 --num_epochs 1 --num_samples 1 --gpus_per_trial 1
Best hyperparameters found were:  {'layer_1_size': 128, 'layer_2_size': 256, 'lr': 0.0032251519139857242, 'batch_size': 32}

# Worker registration error when requesting multiple GPUs
$ python tutorial.py --limit_batches 1 --num_epochs 1 --num_samples 1 --gpus_per_trial 4
(ImplicitFunc pid=58090) [2021-12-21 21:59:48,344 E 58996 58996] core_worker.cc:451: 
Failed to register worker 2ea0a7ae0dcfec5f2917ed1d37227bb2190a87c16f85aa3f859cd7ef to Raylet. 
Invalid: Invalid: Unknown worker
...
Trial shows as RUNNING, but never progresses

This is on a single node/machine that has 4 GPUs attached. Based on PyTorch Lightning's trainer, I would expect Ray to be able to distribute trials across all the available GPUs when they are requested as resources

Versions / Dependencies

System

requirements.txt

pytorch-lightning<1.5
ray[tune]==1.9.0
-f https://download.pytorch.org/whl/cu113/torch_stable.html
torch==1.10.0+cu113
torchvision==0.11.1+cu113

Reproduction script

tutorial.py

from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from filelock import FileLock
import math
import os

import pytorch_lightning as pl
from pytorch_lightning.loggers import TensorBoardLogger
from ray import tune
from ray.tune import CLIReporter
from ray.tune.integration.pytorch_lightning import TuneReportCallback, TuneReportCheckpointCallback
from ray.tune.schedulers import ASHAScheduler, PopulationBasedTraining
import torch
from torch.nn import functional as F
from torch.utils.data import DataLoader, random_split
from torchvision import transforms
from torchvision.datasets import MNIST

class LightningMNISTClassifier(pl.LightningModule):
    """
    This has been adapted from
    https://towardsdatascience.com/from-pytorch-to-pytorch-lightning-a-gentle-introduction-b371b7caaf09
    """

    def __init__(self, config, data_dir=None):
        super(LightningMNISTClassifier, self).__init__()

        self.data_dir = data_dir or os.getcwd()

        self.layer_1_size = config["layer_1_size"]
        self.layer_2_size = config["layer_2_size"]
        self.lr = config["lr"]
        self.batch_size = config["batch_size"]

        # mnist images are (1, 28, 28) (channels, width, height)
        self.layer_1 = torch.nn.Linear(28 * 28, self.layer_1_size)
        self.layer_2 = torch.nn.Linear(self.layer_1_size, self.layer_2_size)
        self.layer_3 = torch.nn.Linear(self.layer_2_size, 10)

    def forward(self, x):
        batch_size, channels, width, height = x.size()
        x = x.view(batch_size, -1)

        x = self.layer_1(x)
        x = torch.relu(x)

        x = self.layer_2(x)
        x = torch.relu(x)

        x = self.layer_3(x)
        x = torch.log_softmax(x, dim=1)

        return x

    def cross_entropy_loss(self, logits, labels):
        return F.nll_loss(logits, labels)

    def accuracy(self, logits, labels):
        _, predicted = torch.max(logits.data, 1)
        correct = (predicted == labels).sum().item()
        accuracy = correct / len(labels)
        return torch.tensor(accuracy)

    def training_step(self, train_batch, batch_idx):
        x, y = train_batch
        logits = self.forward(x)
        loss = self.cross_entropy_loss(logits, y)
        accuracy = self.accuracy(logits, y)

        self.log("ptl/train_loss", loss)
        self.log("ptl/train_accuracy", accuracy)
        return loss

    def validation_step(self, val_batch, batch_idx):
        x, y = val_batch
        logits = self.forward(x)
        loss = self.cross_entropy_loss(logits, y)
        accuracy = self.accuracy(logits, y)
        return {"val_loss": loss, "val_accuracy": accuracy}

    def validation_epoch_end(self, outputs):
        avg_loss = torch.stack([x["val_loss"] for x in outputs]).mean()
        avg_acc = torch.stack([x["val_accuracy"] for x in outputs]).mean()
        self.log("ptl/val_loss", avg_loss)
        self.log("ptl/val_accuracy", avg_acc)

    @staticmethod
    def download_data(data_dir):
        transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307, ), (0.3081, ))
        ])
        with FileLock(os.path.expanduser("~/.data.lock")):
            return MNIST(data_dir, train=True, download=True, transform=transform)

    def prepare_data(self):
        mnist_train = self.download_data(self.data_dir)

        self.mnist_train, self.mnist_val = random_split(
            mnist_train, [55000, 5000])

    def train_dataloader(self):
        return DataLoader(self.mnist_train, batch_size=int(self.batch_size))

    def val_dataloader(self):
        return DataLoader(self.mnist_val, batch_size=int(self.batch_size))

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr)
        return optimizer

def train_mnist_tune(config, num_epochs=10, num_gpus=0, data_dir="~/data", limit_batches=None):
    data_dir = os.path.expanduser(data_dir)
    model = LightningMNISTClassifier(config, data_dir)
    trainer_kwargs = {
        'max_epochs': num_epochs,
        'gpus': math.ceil(num_gpus),
        'logger': TensorBoardLogger(
            save_dir=tune.get_trial_dir(), name="", version="."),
        'progress_bar_refresh_rate': 0,
        'callbacks': [
            TuneReportCallback(
                {"loss": "ptl/val_loss", "mean_accuracy": "ptl/val_accuracy"},
                on="validation_end")]}
    if num_gpus > 1:
        trainer_kwargs.update({'accelerator': 'ddp'})  # Default ddp_spawn doesn't serialize well
    if limit_batches is not None:
        trainer_kwargs.update({
            'limit_train_batches': limit_batches,
            'limit_val_batches': limit_batches})
    trainer = pl.Trainer(**trainer_kwargs)
    trainer.fit(model)

def tune_mnist_asha(num_samples=10, num_epochs=10, gpus_per_trial=0, data_dir="~/data", limit_batches=None):
    config = {
        "layer_1_size": tune.choice([32, 64, 128]),
        "layer_2_size": tune.choice([64, 128, 256]),
        "lr": tune.loguniform(1e-4, 1e-1),
        "batch_size": tune.choice([32, 64, 128]),
    }

    scheduler = ASHAScheduler(
        max_t=num_epochs,
        grace_period=1,
        reduction_factor=2)

    reporter = CLIReporter(
        parameter_columns=["layer_1_size", "layer_2_size", "lr", "batch_size"],
        metric_columns=["loss", "mean_accuracy", "training_iteration"])

    train_fn_with_parameters = tune.with_parameters(
        train_mnist_tune,
        num_epochs=num_epochs,
        num_gpus=gpus_per_trial,
        data_dir=data_dir,
        limit_batches=None)
    resources_per_trial = {"cpu": 1, "gpu": gpus_per_trial}

    analysis = tune.run(train_fn_with_parameters,
        resources_per_trial=resources_per_trial,
        metric="loss",
        mode="min",
        config=config,
        num_samples=num_samples,
        scheduler=scheduler,
        progress_reporter=reporter,
        name="tune_mnist_asha")

    print("Best hyperparameters found were: ", analysis.best_config)

if __name__ == '__main__':

    parser = ArgumentParser(
        description='Tune hyperparameters for MNIST with PyTorch Lightning',
        formatter_class=lambda prog: ArgumentDefaultsHelpFormatter(prog, width=120, max_help_position=60))
    parser.add_argument('-e', '--num_epochs', type=int, default=10, help='maximum number of training epochs')
    parser.add_argument('-g', '--gpus_per_trial', type=float, default=1, help='GPU allocation per Raytune trial')
    parser.add_argument('-l', '--limit_batches', type=int, help='for pl.Trainer, applying to both train/val')
    parser.add_argument('-s', '--num_samples', type=int, default=10, help='number of times to sample hparam space')
    args = parser.parse_args()

    os.environ['RAY_worker_register_timeout_seconds'] = '30'
    tune_mnist_asha(**vars(args))

Anything else

Based on this discussion post, I tried setting the environment variable for RAY_worker_register_timeout_seconds but it does not fix the issue

cc @ericl @rkooo567 @iycheng (from the request on #8890

Are you willing to submit a PR?

ekblad commented 2 years ago

I get the same message at one point, and 0% GPU utilization when using APEX_DDPG-torch on a 4 GPU/128 CPU node. After a few sampling iterations showing 'RUNNING' (and seen on the CPUs via htop), the run crashes with a Dequeue timeout.

clarkzinzow commented 2 years ago

@ericl @rkooo567 Considering this a Core issue with a Tune-based reproduction.

rkooo567 commented 2 years ago

Btw the default timeout is 30, so you should experiment with values like 60.

@iycheng maybe you can take a look at this? I think it could be related to our recent gcs changes, or there’s a failure from worker initialization (which could also be related to recent changes)

mavroudisv commented 2 years ago

Any progress on this? I'm having the same issue.

KastanDay commented 2 years ago

I'm experiencing a similar error in issue #25834

stale[bot] commented 2 years ago

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

You can always ask for help on our discussion forum or Ray's public slack channel.

penguinshin commented 2 years ago

I also am having this issue. Ray.init hangs forever. Happens 4/5 times, sometimes it works.

zhe-thoughts commented 2 years ago

Looks like a P1. I'm putting this into Core team backlog and let's discuss how to fix.