ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.04k stars 5.46k forks source link

TorchTrainer distributed training is not running properly #45825

Open huangzhenghz opened 1 month ago

huangzhenghz commented 1 month ago

What happened + What you expected to happen

Bug: I copied this case from the ray official website and added placement_strategy="SPREAD" in order to allow distributed training on two computers. This program can run very well on one computer, but when I use ray start --head and ray start --address to connect two LAN computers and then run it, the program will get stuck in "Setting up process group for : env:// [rank=0, world_size=2]", an error timeout was reported after 10 minutes.

PS: I don’t know whether this program can only be run on the master node or whether it needs to be run on the worker node as well. However, running the same code on the worker node will only cause the worker node to also get stuck in “Setting up process group for: env:// [rank=0, world_size=2]"

Log: 2024-06-10 14:10:23,587 INFO worker.py:1564 -- Connecting to existing Ray cluster at address: 172.17.130.246:6379... 2024-06-10 14:10:23,591 INFO worker.py:1740 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265

View detailed results here: /home/cooper/ray_results/TorchTrainer_2024-06-10_14-10-23 To visualize your results with TensorBoard, run: tensorboard --logdir /tmp/ray/session_2024-06-10_14-04-38_857705_4700/artifacts/2024-06-10_14-10-23/TorchTrainer_2024-06-10_14-10-23/driver_artifacts 2024-06-10 14:10:23,669 INFO data_parallel_trainer.py:340 -- GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set use_gpu to True in your scaling config.

Training started with configuration: ╭─────────────────────────────────────╮ │ Training config │ ├─────────────────────────────────────┤ │ train_loop_config/batch_size 32 │ │ train_loop_config/lr 0.01 │ │ train_loop_config/num_epochs 10 │ ╰─────────────────────────────────────╯ (TorchTrainer pid=9544) GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set use_gpu to True in your scaling config. (RayTrainWorker pid=9616) Setting up process group for: env:// [rank=0, world_size=2]

After 10 minutes, an error occurred:

(RayTrainWorker pid=3738, ip=172.17.130.64) [W socket.cpp:432] [c10d] While waitForInput, poolFD failed with (errno: 0 - Undefined error: 0). (RayTrainWorker pid=9616) [W socket.cpp:432] [c10d] While waitForInput, poolFD failed with (errno: 0 - Success). 2024-06-10 14:40:30,679 ERROR tune_controller.py:1331 -- Trial task failed for trial TorchTrainer_1fa3e_00000 Traceback (most recent call last): File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/air/execution/_internal/event_manager.py", line 110, in resolve_future result = ray.get(future) File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper return fn(*args, kwargs) File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper return func(*args, *kwargs) File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/_private/worker.py", line 2623, in get values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout) File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/_private/worker.py", line 861, in get_objects raise value.as_instanceof_cause() ray.exceptions.RayTaskError(DistStoreError): ray::_Inner.train() (pid=9544, ip=172.17.130.246, actor_id=13a44d0d2f3b811d00db9b3403000000, repr=TorchTrainer) File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/tune/trainable/trainable.py", line 331, in train raise skipped from exception_cause(skipped) File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/air/_internal/util.py", line 98, in run self._ret = self._target(self._args, self._kwargs) File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/tune/trainable/function_trainable.py", line 45, in training_func=lambda: self._trainable_func(self.config), File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/train/base_trainer.py", line 799, in _trainable_func super()._trainable_func(self._merged_config) File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/tune/trainable/function_trainable.py", line 248, in _trainable_func output = fn() File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/train/base_trainer.py", line 107, in _train_coordinator_fn trainer.training_loop() File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/train/data_parallel_trainer.py", line 458, in training_loop backend_executor.start() File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/train/_internal/backend_executor.py", line 190, in start self._backend.on_start(self.worker_group, self._backend_config) File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/train/torch/config.py", line 197, in on_start ray.get(setup_futures) ray.exceptions.RayTaskError(DistStoreError): ray::_RayTrainWorkerexecute._setup_torch_process_group() (pid=3738, ip=172.17.130.64, actor_id=e0aa208ed5070b0d705a881303000000, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x10ceb6ef0>) File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/train/_internal/worker_group.py", line 33, in execute File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/train/_internal/worker_group.py", line 30, in execute File "/Users/zhenghuang/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/train/torch/config.py", line 112, in _setup_torch_process_group dist.init_process_group( File "/Users/zhenghuang/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/torch/distributed/c10d_logger.py", line 75, in wrapper return func(*args, kwargs) File "/Users/zhenghuang/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/torch/distributed/c10d_logger.py", line 89, in wrapper func_return = func(*args, *kwargs) File "/Users/zhenghuang/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 1312, in init_process_group defaultpg, = _new_process_group_helper( File "/Users/zhenghuang/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 1509, in _new_process_group_helper backend_class = ProcessGroupGloo(backend_prefix_store, group_rank, group_size, timeout=timeout) torch.distributed.DistStoreError: Socket Timeout 2024-06-10 14:40:30,685 WARNING experiment_state.py:205 -- Experiment state snapshotting has been triggered multiple times in the last 5.0 seconds. A snapshot is forced if CheckpointConfig(num_to_keep) is set, and a trial has checkpointed >= num_to_keep times since the last snapshot. You may want to consider increasing the CheckpointConfig(num_to_keep) or decreasing the frequency of saving checkpoints. You can suppress this error by setting the environment variable TUNE_WARN_EXCESSIVE_EXPERIMENT_CHECKPOINT_SYNC_THRESHOLD_S to a smaller value than the current threshold (5.0). 2024-06-10 14:40:30,686 INFO tune.py:1007 -- Wrote the latest version of all result files and experiment state to '/home/cooper/ray_results/TorchTrainer_2024-06-10_14-10-23' in 0.0016s. 2024-06-10 14:40:30,688 ERROR tune.py:1035 -- Trials did not complete: [TorchTrainer_1fa3e_00000] Training errored after 0 iterations at 2024-06-10 14:40:30. Total running time: 30min 7s Error file: /tmp/ray/session_2024-06-10_14-04-38_857705_4700/artifacts/2024-06-10_14-10-23/TorchTrainer_2024-06-10_14-10-23/driver_artifacts/TorchTrainer_1fa3e_00000_0_2024-06-10_14-10-23/error.txt ray.exceptions.RayTaskError(DistStoreError): ray::_Inner.train() (pid=9544, ip=172.17.130.246, actor_id=13a44d0d2f3b811d00db9b3403000000, repr=TorchTrainer) File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/tune/trainable/trainable.py", line 331, in train raise skipped from exception_cause(skipped) File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/air/_internal/util.py", line 98, in run self._ret = self._target(self._args, self._kwargs) File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/tune/trainable/function_trainable.py", line 45, in training_func=lambda: self._trainable_func(self.config), File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/train/base_trainer.py", line 799, in _trainable_func super()._trainable_func(self._merged_config) File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/tune/trainable/function_trainable.py", line 248, in _trainable_func output = fn() File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/train/base_trainer.py", line 107, in _train_coordinator_fn trainer.training_loop() File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/train/data_parallel_trainer.py", line 458, in training_loop backend_executor.start() File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/train/_internal/backend_executor.py", line 190, in start self._backend.on_start(self.worker_group, self._backend_config) File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/train/torch/config.py", line 197, in on_start ray.get(setup_futures) ray.exceptions.RayTaskError(DistStoreError): ray::_RayTrainWorkerexecute._setup_torch_process_group() (pid=3738, ip=172.17.130.64, actor_id=e0aa208ed5070b0d705a881303000000, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x10ceb6ef0>) File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/train/_internal/worker_group.py", line 33, in execute File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/train/_internal/worker_group.py", line 30, in execute File "/Users/zhenghuang/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/train/torch/config.py", line 112, in _setup_torch_process_group dist.init_process_group( File "/Users/zhenghuang/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/torch/distributed/c10d_logger.py", line 75, in wrapper return func(*args, *kwargs) File "/Users/zhenghuang/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/torch/distributed/c10d_logger.py", line 89, in wrapper func_return = func(args, **kwargs) File "/Users/zhenghuang/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 1312, in init_process_group defaultpg, = _new_process_group_helper( File "/Users/zhenghuang/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 1509, in _new_process_group_helper backend_class = ProcessGroupGloo(backend_prefix_store, group_rank, group_size, timeout=timeout) torch.distributed.DistStoreError: Socket Timeout The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/cooper/AI/PythonProject/Distributed-Transformer/ray-example.py", line 101, in result = trainer.fit() File "/home/cooper/.virtualenvs/Distributed-Transformer/lib/python3.10/site-packages/ray/train/base_trainer.py", line 638, in fit raise TrainingFailedError( ray.train.base_trainer.TrainingFailedError: The Ray Train run failed. Please inspect the previous error messages for a cause. After fixing the issue (assuming that the error is not caused by your own application logic, but rather an error such as OOM), you can restart the run from scratch or continue this run. To continue this run, you can use: trainer = TorchTrainer.restore("/home/cooper/ray_results/TorchTrainer_2024-06-10_14-10-23"). To start a new run that will retry on training failures, set train.RunConfig(failure_config=train.FailureConfig(max_failures)) in the Trainer's run_config with max_failures > 0, or max_failures = -1 for unlimited retries.

Except: Distributed training on two computers

Versions / Dependencies

python==3.10.14

accelerate==0.30.1 aiohttp==3.9.5 aiohttp-cors==0.7.0 aiosignal==1.3.1 annotated-types==0.7.0 async-timeout==4.0.3 attrs==23.2.0 cachetools==5.3.3 certifi==2024.2.2 charset-normalizer==3.3.2 click==8.1.7 colorful==0.5.6 datasets==2.19.1 dill==0.3.8 distlib==0.3.8 evaluate==0.4.2 filelock==3.14.0 frozenlist==1.4.1 fsspec==2024.3.1 google-api-core==2.19.0 google-auth==2.30.0 googleapis-common-protos==1.63.1 grpcio==1.64.1 huggingface-hub==0.23.2 idna==3.7 Jinja2==3.1.4 joblib==1.4.2 jsonschema==4.22.0 jsonschema-specifications==2023.12.1 linkify-it-py==2.0.3 markdown-it-py==3.0.0 MarkupSafe==2.1.5 mdit-py-plugins==0.4.1 mdurl==0.1.2 memray==1.12.0 mpmath==1.3.0 msgpack==1.0.8 multidict==6.0.5 multiprocess==0.70.16 networkx==3.3 numpy==1.26.4 nvidia-cublas-cu12==12.1.3.1 nvidia-cuda-cupti-cu12==12.1.105 nvidia-cuda-nvrtc-cu12==12.1.105 nvidia-cuda-runtime-cu12==12.1.105 nvidia-cudnn-cu12==8.9.2.26 nvidia-cufft-cu12==11.0.2.54 nvidia-curand-cu12==10.3.2.106 nvidia-cusolver-cu12==11.4.5.107 nvidia-cusparse-cu12==12.1.0.106 nvidia-nccl-cu12==2.20.5 nvidia-nvjitlink-cu12==12.5.40 nvidia-nvtx-cu12==12.1.105 opencensus==0.11.4 opencensus-context==0.1.3 packaging==24.0 pandas==2.2.2 platformdirs==4.2.2 prometheus_client==0.20.0 proto-plus==1.23.0 protobuf==4.25.3 psutil==5.9.8 py-spy==0.3.14 pyarrow==16.1.0 pyarrow-hotfix==0.6 pyasn1==0.6.0 pyasn1_modules==0.4.0 pydantic==2.7.3 pydantic_core==2.18.4 Pygments==2.18.0 python-dateutil==2.9.0.post0 pytz==2024.1 PyYAML==6.0.1 ray==2.23.0 referencing==0.35.1 regex==2024.5.15 requests==2.32.3 rich==13.7.1 rpds-py==0.18.1 rsa==4.9 safetensors==0.4.3 scikit-learn==1.5.0 scipy==1.13.1 six==1.16.0 smart-open==7.0.4 sympy==1.12.1 tensorboardX==2.6.2.2 textual==0.65.2 threadpoolctl==3.5.0 tokenizers==0.19.1 torch==2.3.0 tqdm==4.66.4 transformers==4.41.2 triton==2.3.0 typing_extensions==4.12.0 tzdata==2024.1 uc-micro-py==1.0.3 urllib3==2.2.1 virtualenv==20.26.2 wrapt==1.16.0 xxhash==3.4.1 yarl==1.9.4

Reproduction script

import os
import tempfile

import torch
from torch import nn
from torch.nn.parallel import DistributedDataParallel

import ray
from ray.train import Checkpoint, CheckpointConfig, RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer, TorchConfig

ray.init()

# If using GPUs, set this to True.
use_gpu = False
# Number of processes to run training on.
num_workers = 2

# Define your network structure.
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(1, 32)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(32, 1)

    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))

# Training loop.
def train_loop_per_worker(config):

    # Read configurations.
    lr = config["lr"]
    batch_size = config["batch_size"]
    num_epochs = config["num_epochs"]

    # Fetch training dataset.
    train_dataset_shard = ray.train.get_dataset_shard("train")

    # Instantiate and prepare model for training.
    model = NeuralNetwork()
    model = ray.train.torch.prepare_model(model)

    # Define loss and optimizer.
    loss_fn = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)

    # Create data loader.
    dataloader = train_dataset_shard.iter_torch_batches(
        batch_size=batch_size, dtypes=torch.float
    )

    # Train multiple epochs.
    for epoch in range(num_epochs):

        # Train epoch.
        for batch in dataloader:
            output = model(batch["input"])
            loss = loss_fn(output, batch["label"])
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Create checkpoint.
        base_model = (model.module
            if isinstance(model, DistributedDataParallel) else model)
        checkpoint_dir = tempfile.mkdtemp()
        torch.save(
            {"model_state_dict": base_model.state_dict()},
            os.path.join(checkpoint_dir, "model.pt"),
        )
        checkpoint = Checkpoint.from_directory(checkpoint_dir)

        # Report metrics and checkpoint.
        ray.train.report({"loss": loss.item()}, checkpoint=checkpoint)

# Define configurations.
train_loop_config = {"num_epochs": 10, "lr": 0.01, "batch_size": 32}
scaling_config = ScalingConfig(num_workers=num_workers, use_gpu=use_gpu, placement_strategy="SPREAD")
run_config = RunConfig(checkpoint_config=CheckpointConfig(num_to_keep=1))

# Define datasets.
train_dataset = ray.data.from_items(
    [{"input": [x], "label": [2 * x + 1]} for x in range(20)]
)
datasets = {"train": train_dataset}

# Initialize the Trainer.
trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    torch_config=TorchConfig(backend="gloo"),
    train_loop_config=train_loop_config,
    scaling_config=scaling_config,
    run_config=run_config,
    datasets=datasets
)

# Train the model.
result = trainer.fit()

# Inspect the results.
final_loss = result.metrics["loss"]

Issue Severity

High: It blocks me from completing my task.

justinvyu commented 1 week ago

@huangzhenghz Could you describe the cluster setup a bit more? How are the machines configured to talk to each other?

Is ray able to schedule tasks onto both machines correctly? Is only the torch distributed process group setup hanging, even when you run on a single node?

You may also want to set use_gpu=True since you're using GPU machines. This sets the torch communication backend to nccl and you may see some different behavior.

huangzhenghz commented 4 days ago

@justinvyu Thanks for your response.

All I've done is set up the cluster as I described, connecting the two computers through ray start --head on one and ray start --address on another. Is there anything else I need to do?

When I run this cluster in my home on a local area network, it doesn't work well. My network speed is below 100Mbps. But when I rent two servers with a good network connection, it seems the problem was solved. Does Ray fail to communicate properly in a poor network environment?