ray-project / ray_lightning

Pytorch Lightning Distributed Accelerators using Ray
Apache License 2.0
211 stars 34 forks source link

Stalls if num_workers is too high #89

Open import-antigravity opened 3 years ago

import-antigravity commented 3 years ago

I've noticed that while running on my SLURM cluster, if num_workers is set to too many (as far as I can tell this is an arbitrary amount) the job starts and the Ray dashboard shows that GPUs are being fully utilized but the job stalls and times out.

Python script:

import logging
import os

import pytorch_lightning as pl
import ray
import torch
import torchvision
from pytorch_lightning import LightningDataModule
from ray import tune
from ray_lightning import RayPlugin
from ray_lightning.tune import get_tune_ddp_resources, TuneReportCallback
from torch.nn import functional as F
from torchvision import transforms

DATA_DIR = os.path.abspath('../../data')

GPU = False
if torch.cuda.is_available():
    GPU = True

print('GPU', GPU)

NUM_WORKERS = 8 # Set equal to the number of GPUs

def mnist(root: str, normalize: bool = False):
    tlist = [transforms.ToTensor()]

    if normalize:
        tlist.append(transforms.Normalize((0.5,), (0.5,)))

    transform = transforms.Compose(tlist)

    trainset = torchvision.datasets.MNIST(root=root, train=True, download=True, transform=transform)
    testset = torchvision.datasets.MNIST(root=root, train=False, download=True, transform=transform)
    return trainset, testset

def mnist_datamodule(data_path: str, batch_size: int, num_workers: int) -> LightningDataModule:
    train, val = mnist(data_path, normalize=True)
    return LightningDataModule.from_datasets(train, val, None, batch_size=batch_size, num_workers=num_workers)

class LightningMNISTClassifier(pl.LightningModule):
    def __init__(self, config, data_dir=None):
        super(LightningMNISTClassifier, self).__init__()

        self.data_dir = data_dir or os.getcwd()
        self.lr = config["lr"]
        layer_1, layer_2 = config["layer_1"], config["layer_2"]
        self.batch_size = config["batch_size"]

        # mnist images are (1, 28, 28) (channels, width, height)
        self.layer_1 = torch.nn.Linear(28 * 28, layer_1)
        self.layer_2 = torch.nn.Linear(layer_1, layer_2)
        self.layer_3 = torch.nn.Linear(layer_2, 10)
        self.accuracy = pl.metrics.Accuracy()

    def forward(self, x):
        batch_size, channels, width, height = x.size()
        x = x.view(batch_size, -1)
        x = self.layer_1(x)
        x = torch.relu(x)
        x = self.layer_2(x)
        x = torch.relu(x)
        x = self.layer_3(x)
        x = torch.log_softmax(x, dim=1)
        return x

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.lr)

    def training_step(self, train_batch, batch_idx):
        x, y = train_batch
        logits = self.forward(x)
        loss = F.nll_loss(logits, y)
        return loss

    def validation_step(self, val_batch, batch_idx):
        x, y = val_batch
        logits = self.forward(x)
        loss = F.nll_loss(logits, y)
        acc = self.accuracy(logits, y)
        return {"val_loss": loss, "val_accuracy": acc}

    def validation_epoch_end(self, outputs):
        avg_loss = torch.stack([x["val_loss"] for x in outputs]).mean()
        avg_acc = torch.stack([x["val_accuracy"] for x in outputs]).mean()
        self.log("ptl/val_loss", avg_loss)
        self.log("ptl/val_accuracy", avg_acc)

def train_mnist_tune(config, num_epochs=10):
    dm = mnist_datamodule(DATA_DIR, config['batch_size'], 1)
    model = LightningMNISTClassifier(config, '.')
    plugin = RayPlugin(num_workers=NUM_WORKERS, use_gpu=GPU)
    trainer = pl.Trainer(
        max_epochs=num_epochs,
        progress_bar_refresh_rate=0,
        callbacks=[TuneReportCallback(on="epoch_end")],
        plugins=[plugin])
    trainer.fit(model, dm)

def tune_mnist(num_samples, num_epochs):
    config = {
        "layer_1": tune.choice([32, 64, 128]),
        "layer_2": tune.choice([64, 128, 256]),
        "lr": tune.loguniform(1e-4, 1e-1),
        "batch_size": tune.choice([32, 64, 128]),
    }

    trainable = tune.with_parameters(
        train_mnist_tune, num_epochs=num_epochs)
    analysis = tune.run(
        trainable,
        resources_per_trial=get_tune_ddp_resources(num_workers=NUM_WORKERS, use_gpu=GPU),
        metric="ptl/val_loss",
        mode="min",
        config=config,
        num_samples=num_samples,
        name="tune_mnist")

    print("Best hyperparameters found were: ", analysis.best_config)

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    os.environ['RAY_BACKEND_LOG_LEVEL'] = 'debug'
    os.environ['TUNE_DISABLE_AUTO_CALLBACK_LOGGERS'] = '1'
    os.environ['SLURM_JOB_NAME'] = 'bash'

    try:
        ray.init(address='auto')
    except ConnectionError:
        pass

    tune_mnist(num_samples=100, num_epochs=100)

SLURM script:

#!/bin/bash
#SBATCH --job-name="experiment"
#SBATCH --output=experiment.out
#SBATCH -N 4
#SBATCH --mem=32gb
#SBATCH --tasks-per-node=1
#SBATCH -p class -C gpu2080
#SBATCH --gres=gpu:2
#SBATCH --cpus-per-task=8
#SBATCH --gpus-per-task=2
#SBATCH --time=36:00:00

echo "Loading modules..."

module swap intel gcc
module load cuda/10.1
module load nvhpc

source ~/miniconda3/etc/profile.d/conda.sh
conda activate ganresearch

# Set environment variables

export TUNE_DISABLE_AUTO_CALLBACK_LOGGERS="1"
export CUDA_LAUNCH_BLOCKING="1"
#export PL_TORCH_DISTRIBUTED_BACKEND="gloo"

# __doc_head_address_start__

# Getting the node names
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
nodes_array=($nodes)

head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)

# __doc_head_ray_start__
port=6379
ip_head=$head_node_ip:$port
export ip_head
echo "IP Head: $ip_head"

echo "Starting HEAD at $head_node"
srun --nodes=1 --ntasks=1 --gres=gpu:2 -w "$head_node" \
    ray start --head --node-ip-address="$head_node_ip" --port=$port \
    --num-cpus "${SLURM_CPUS_PER_TASK}" --num-gpus "${SLURM_GPUS_PER_TASK}" \
    --dashboard-host "$head_node_ip" --block &
# __doc_head_ray_end__

# __doc_worker_ray_start__

# number of nodes other than the head node
worker_num=$((SLURM_JOB_NUM_NODES - 1))

for ((i = 1; i <= worker_num; i++)); do
    node_i=${nodes_array[$i]}
    echo "Starting WORKER $i at $node_i"
    srun --nodes=1 --ntasks=1 --gres=gpu:2 -w "$node_i" \
        ray start --address "$ip_head" \
        --num-cpus "${SLURM_CPUS_PER_TASK}" --num-gpus "${SLURM_GPUS_PER_TASK}" --block &
    sleep 1
done
# __doc_worker_ray_end__

# __doc_script_start__

python test_tune.py
amogkam commented 3 years ago

What's the behavior you're seeing when it stalls/times out? Is this an actual TimeoutError, or is it just hanging with no progress being made? It would be great if you can send what your stdout looks like when you're seeing this happening.

import-antigravity commented 3 years ago

That's the strange thing, it just hangs without starting any tuning trials.

import-antigravity commented 3 years ago

What's the behavior you're seeing when it stalls/times out? Is this an actual TimeoutError, or is it just hanging with no progress being made? It would be great if you can send what your stdout looks like when you're seeing this happening.

2021-09-24 12:32:06,187 INFO services.py:1263 -- View the Ray dashboard at http://...:8265
[2021-09-24 12:32:06,805 I 508 508] global_state_accessor.cc:332: This node has an IP address of ..., while we can not found the matched Raylet address. This maybe come from when you connect the Ray cluster with a different IP address or connect a container.
[2021-09-24 12:32:06,877 I 29597 29597] global_state_accessor.cc:332: This node has an IP address of ..., while we can not found the matched Raylet address. This maybe come from when you connect the Ray cluster with a different IP address or connect a container.
Starting WORKER 4 at classt05
[2021-09-24 12:32:07,821 I 27445 27445] global_state_accessor.cc:332: This node has an IP address of ..., while we can not found the matched Raylet address. This maybe come from when you connect the Ray cluster with a different IP address or connect a container.
[2021-09-24 12:32:07,855 I 31635 31635] global_state_accessor.cc:332: This node has an IP address of ..., while we can not found the matched Raylet address. This maybe come from when you connect the Ray cluster with a different IP address or connect a container.
DEBUG:root:NCCL: True
DEBUG:ray.worker:Automatically increasing RLIMIT_NOFILE to max value of 131072
2021-09-24 12:32:10,875 INFO worker.py:825 -- Connecting to existing Ray cluster at address: ...:6379
[2021-09-24 12:32:10,882 I 22186 22186] logging.cc:186: Set ray log level from environment variable RAY_BACKEND_LOG_LEVEL to -1
2021-09-24 12:32:11,055 WARNING tune.py:506 -- Tune detects GPUs, but no trials are using GPUs. To enable trials to use GPUs, set tune.run(resources_per_trial={'gpu': 1}...) which allows Tune to expose 1 GPU to each trial. You can also override `Trainable.default_resource_request` if using the Trainable API.
slurmstepd: error: _is_a_lwp: open() /proc/1373/status failed: No such file or directory
slurmstepd: error: *** STEP 16419161.4 ON classt04 CANCELLED AT 2021-09-26T00:32:17 DUE TO TIME LIMIT ***
slurmstepd: error: *** STEP 16419161.2 ON classt02 CANCELLED AT 2021-09-26T00:32:17 DUE TO TIME LIMIT ***
slurmstepd: error: *** STEP 16419161.5 ON classt05 CANCELLED AT 2021-09-26T00:32:17 DUE TO TIME LIMIT ***
slurmstepd: error: *** STEP 16419161.1 ON classt01 CANCELLED AT 2021-09-26T00:32:17 DUE TO TIME LIMIT ***
slurmstepd: error: *** STEP 16419161.3 ON classt03 CANCELLED AT 2021-09-26T00:32:17 DUE TO TIME LIMIT ***
srun: Job step aborted: Waiting up to 32 seconds for job step to finish.
slurmstepd: error: *** JOB 16419161 ON classt01 CANCELLED AT 2021-09-26T00:32:17 DUE TO TIME LIMIT ***
srun: Job step aborted: Waiting up to 32 seconds for job step to finish.
srun: Job step aborted: Waiting up to 32 seconds for job step to finish.
srun: Job step aborted: Waiting up to 32 seconds for job step to finish.
srun: Job step aborted: Waiting up to 32 seconds for job step to finish.