ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.23k stars 5.81k forks source link

[Tune|Air|Train] `wandb` errors out with `PB2` scheduler #43855

Open simonsays1980 opened 8 months ago

simonsays1980 commented 8 months ago

What happened + What you expected to happen

What happened

I ran a hyperparameter search with PB2 using tune and rllib. I logged results with wandb using the WandbLoggerCallback. The experiement was run on GCP using Ray's autoscaler with the 2.10.0.d8b3d6-py39-gpu image. While PB2 worked seemlessly, the wandb logging errored out quite early in the experiment (iteration 14 of 500) with the following error:

2024-03-09 04:02:33,890 WARNING syncer.py:406 -- Last sync command failed with the following error:
Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/train/_internal/syncer.py", line 404, in _launch_sync_process
    self.wait()
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/train/_internal/syncer.py", line 474, in wait
    raise e
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/train/_internal/syncer.py", line 472, in wait
    self._sync_process.wait(timeout=timeout or self.sync_timeout)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/train/_internal/syncer.py", line 173, in wait
    raise exception
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/train/_internal/syncer.py", line 136, in entrypoint
    result = self._fn(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/train/_internal/storage.py", line 212, in _upload_to_fs_path
    _pyarrow_fs_copy_files(local_path, fs_path, destination_filesystem=fs)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/train/_internal/storage.py", line 110, in _pyarrow_fs_copy_files
    return pyarrow.fs.copy_files(
  File "/home/ray/anaconda3/lib/python3.9/site-packages/pyarrow/fs.py", line 269, in copy_files
    _copy_files_selector(source_fs, source_sel,
  File "pyarrow/_fs.pyx", line 1616, in pyarrow._fs._copy_files_selector
  File "pyarrow/error.pxi", line 113, in pyarrow.lib.check_status
FileNotFoundError: [Errno 2] Failed to open local file '/tmp/ray/session_2024-03-09_03-41-15_467742_274/artifacts/2024-03-09_03-44-00/pco_with_pb2_hps_search/driver_artifacts/pco_ant_velocity_gym_v1_52603_00001_1_2024-03-09_03-44-02/wandb/latest-run/files/wandb-metadata.json'. Detail: [errno 2] No such file or directory

Could this may be related to PB2 pausing and restarting runs? At least in wandb I see only 14 iterations in the runs. Is there maybe a config parameter for wandb that should be used when using a hyperparameter search scheduler in Ray (s.th. like `resume="must``?

The file that is not found is actually there:

cat /tmp/ray/session_2024-03-09_03-41-15_467742_274/artifacts/2024-03-09_03-44-00/pco_with_pb2_hps_search/driver_artifacts/pco_ant_velocity_gym_v1_52603_00001_1_2024-03-09_03-44-02/wandb/latest-run/files/wandb-metadata.json 
{
    "os": "Linux-5.10.0-26-cloud-amd64-x86_64-with-glibc2.31",
    "python": "3.9.18",
    "heartbeatAt": "2024-03-09T14:59:16.726840",
    "startedAt": "2024-03-09T14:59:16.111787",
    "docker": null,
    "gpu": "Tesla T4",
    "gpu_count": 2,
    "cpu_count": 24,
    "cuda": null,
    "args": [
        "--node-ip-address=10.138.0.53",
        "--node-manager-port=42179",
        "--object-store-name=/tmp/ray/session_2024-03-09_03-41-15_467742_274/sockets/plasma_store",
        "--raylet-name=/tmp/ray/session_2024-03-09_03-41-15_467742_274/sockets/raylet",
        "--redis-address=None",
        "--metrics-agent-port=54804",
        "--runtime-env-agent-port=35482",
        "--logging-rotate-bytes=536870912",
        "--logging-rotate-backup-count=5",
        "--runtime-env-agent-port=35482",
        "--gcs-address=10.138.0.53:6379",
        "--session-name=session_2024-03-09_03-41-15_467742_274",
        "--temp-dir=/tmp/ray",
        "--webui=127.0.0.1:8265",
        "--cluster-id=a7b1878618389111bc531244485277675fedc3996750cd185f99784e",
        "--startup-token=547",
        "--worker-launch-time-ms=1709996339129",
        "--runtime-env-hash=802302935"
    ],
    "state": "running",
    "program": "/home/ray/anaconda3/lib/python3.9/site-packages/ray/_private/workers/default_worker.py",
    "host": "ray-ray-eplus-gpu-head-4a2facf5-compute",
    "username": "ray",
    "executable": "/home/ray/anaconda3/bin/python"
}

Is there maybe a very slow write process such that the wandb read process has no such file available, yet, when trying to read it?

What you expected to happen

That wandb logs for each of the 10 runs I have the iterations in sequence (from 1 - 500) and does not error out. Basically, that all necessary information for logging is available to wandb when using a scheduler in Ray tune and that not the user has to figure out, if another wandb configuration needs to be used with a scheduler.

Versions / Dependencies

Ubuntu 20.0 Python 3.9 Ray Image 2.10.0.d8b3d6-py39-gpu`

Reproduction script

import random

from ray.air.integrations.wandb import WandbLoggerCallback
from ray import train, tune

from ray.rllib.algorithms.ppo.ppo import PPOConfig
from ray.rllib.connectors.env_to_module.mean_std_filter import MeanStdFilter
from ray.rllib.env.single_agent_env_runner import SingleAgentEnvRunner
from ray.tune.schedulers.pb2 import PB2

pb2_scheduler = PB2(
    time_attr="training_iteration",
    metric="episode_reward_mean",
    mode="max",
    perturbation_interval=15,
    # Copy bottom % with top % weights.
    quantile_fraction=0.25,
    hyperparam_bounds={
        "lr": [1e-5, 1e-3],
        "gamma": [0.95, 0.99],
        "lambda": [0.9, 1.0],
        "entropy_coeff": [0.0, 1e-5],
        "clip_param": [0.1, 0.4],
        "vf_loss_coeff": [0.01, 1.0],
    },
)

config = (
    PPOConfig()
    .environment(
        env="CartPole-v1",
    )
    .framework(
        framework="torch",
    )
    .rollouts(
        rollout_fragment_length=10000,
        num_envs_per_worker=1,
        num_rollout_workers=2,
        ignore_worker_failures=True,
        recreate_failed_workers=True,
        env_runner_cls=SingleAgentEnvRunner,
        env_to_module_connector=(lambda env: MeanStdFilter(multi_agent=False)),
    )
    .resources(
        num_gpus_per_learner_worker=0.2,
    )
    .experimental(
        _enable_new_api_stack=True,
    )
    .reporting(
        metrics_num_episodes_for_smoothing=50,
    )
    .training(
        lr=tune.sample_from(lambda spec: random.uniform(1e-5, 1e-2)),
        gamma=tune.sample_from(lambda spec: random.uniform(0.95, 0.99)),
        lambda_=tune.sample_from(lambda spec: random.uniform(0.9, 1.0)),
        entropy_coeff=tune.sample_from(lambda spec: random.uniform(0.0, 1e-5)),
        use_kl_loss=True,
        kl_target=0.02,
        clip_param=tune.sample_from(lambda spec: random.uniform(0.1, 0.4)),
        vf_clip_param=float("inf"),
        vf_loss_coeff=tune.sample_from(lambda spec: random.uniform(0.01, 1.0)),
        train_batch_size_per_learner=20000,
        mini_batch_size_per_learner=64,
        num_sgd_iter=10,
        model={
            "fcnet_activation": "tanh",
            "fcnet_hiddens": [64, 64],
        },
    )
    .debugging(
        log_level="DEBUG",
        seed=0
    )
)

if tune.Tuner.can_restore(path="gs://ray-results-2022-09-01/"):

    tuner = tune.Tuner.restore(
        path="gs://ray-results-2022-09-01/",
        trainable="PPO",
        param_space=config,
        resume_errored=True,
        resume_unfinished=True,
    )

else:
    tuner = tune.Tuner(
        "PPO",
        param_space=config,
        tune_config=tune.TuneConfig(
            num_samples=10,
            scheduler=pb2_scheduler,
        ),
        run_config=train.RunConfig(
            storage_path="gs://ray-results-2022-09-01/",
            sync_config=train.SyncConfig(
                # Note, Cloud Storage defines the biggest costs.
                # Less frequent syncing might work here to save costs.
                sync_period=600,
            ),
            checkpoint_config=train.CheckpointConfig(
                checkpoint_at_end=True,
                # These settings also intend to save Cloud Storage costs.
                checkpoint_frequency=5,
                num_to_keep=5,
            ),
            stop={"training_iteration": 500},
            name="ppo_with_pb2_hps_search",
            callbacks=[
                WandbLoggerCallback(
                    api_key_file="data/wandb/wandb_api_key.txt",
                    project="rllib-ppo",
                    log_config=True,
                )
            ],
        ),
    )

tuner.fit()

Here is the cluster YAML used:

# An unique identifier for the head node and workers of this cluster.
cluster_name: ray-gpu

# The maximum number of workers nodes to launch in addition to the head
# node.
max_workers: 0

# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 1.0

# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
docker:
    image: "rayproject/ray-ml:2.10.0.d8b3d6-py39-gpu"
    #image: "rayproject/ray-ml:2.2.0-py39-gpu"
    # image: rayproject/ray:latest-gpu   # use this one if you don't need ML dependencies, it's faster to pull
    container_name: "ray_nvidia_docker" # e.g. ray_docker

    # # Example of running a GPU head with CPU workers
    # head_image: "rayproject/ray-ml:latest-gpu"

    # worker_image: "rayproject/ray-ml:latest"

# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5

# Cloud-provider specific configuration.
provider:
    type: gcp
    region: us-west1
    availability_zone: us-west1-a
    project_id: <PROJECT_ID> # Globally unique project id

# How Ray will authenticate with newly launched nodes.
auth:
    ssh_user: ubuntu
# By default Ray creates a new private keypair, but you can also use your own.
# If you do so, make sure to also set "KeyName" in the head and worker node
# configurations below. This requires that you have added the key into the
# project wide meta-data.
#    ssh_private_key: /path/to/your/key.pem

# Tell the autoscaler the allowed node types and the resources they provide.
# The key is the name of the node type, which is just for debugging purposes.
# The node config specifies the launch config and physical instance type.
available_node_types:
    ray_head_gpu:
        # The resources provided by this node type.
        resources: {"CPU": 48, "GPU": 2}
        # Provider-specific config for the head node, e.g. instance type. By default
        # Ray will auto-configure unspecified fields such as subnets and ssh-keys.
        # For more documentation on available fields, see:
        # https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert
        node_config:
            machineType: n1-custom-48-319488
            disks:
              - boot: true
                autoDelete: true
                type: PERSISTENT
                initializeParams:
                  diskSizeGb: 2000
                  # See https://cloud.google.com/compute/docs/images for more images
                  sourceImage: projects/ml-images/global/images/c0-deeplearning-common-cu121-v20231209-debian-11
            # Make sure to set scheduling->onHostMaintenance to TERMINATE when GPUs are present
            guestAccelerators:
              - acceleratorType: nvidia-tesla-t4
                acceleratorCount: 2
            metadata:
              items:
                - key: install-nvidia-driver
                  value: "True"
            scheduling:
              - onHostMaintenance: TERMINATE
            serviceAccounts:
              - email: ray-autoscaler-sa-v1@PROJECT_ID.iam.gserviceaccount.com
                scopes:
                - https://www.googleapis.com/auth/cloud-platform

    ray_worker_gpu:
        # The minimum number of nodes of this type to launch.
        # This number should be >= 0.
        min_workers: 0
        # The maximum number of workers nodes of this type to launch.
        # This takes precedence over min_workers.
        max_workers: 2
        # The resources provided by this node type.
        resources: {"CPU": 2, "GPU": 1}
        # Provider-specific config for the head node, e.g. instance type. By default
        # Ray will auto-configure unspecified fields such as subnets and ssh-keys.
        # For more documentation on available fields, see:
        # https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert
        node_config:
            machineType: n1-standard-2
            disks:
              - boot: true
                autoDelete: true
                type: PERSISTENT
                initializeParams:
                  diskSizeGb: 50
                  # See https://cloud.google.com/compute/docs/images for more images
                  sourceImage: projects/ml-images/global/images/c0-deeplearning-common-cu121-v20231209-debian-11
            # Make sure to set scheduling->onHostMaintenance to TERMINATE when GPUs are present
            guestAccelerators:
              - acceleratorType: nvidia-tesla-t4
                acceleratorCount: 1
            metadata:
              items:
                - key: install-nvidia-driver
                  value: "True"
            # Run workers on preemtible instance by default.
            # Comment this out to use on-demand.
            scheduling:
              - preemptible: true
              - onHostMaintenance: TERMINATE

# Specify the node type of the head node (as configured above).
head_node_type: ray_head_gpu

# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
#    "/path1/on/remote/machine": "/path1/on/local/machine",
#    "/path2/on/remote/machine": "/path2/on/local/machine",
}

# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False

# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude:
    - "**/.git"
    - "**/.git/**"

# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
rsync_filter:
    - ".gitignore"

initialization_commands:
    # Wait until nvidia drivers are installed
    - >-
      timeout 300 bash -c "
          command -v nvidia-smi && nvidia-smi
          until [ \$? -eq 0 ]; do
              command -v nvidia-smi && nvidia-smi
          done"
    - gcloud auth configure-docker us-docker.pkg.dev --quiet

# List of shell commands to run to set up nodes.
# NOTE: rayproject/ray-ml:latest has ray latest bundled
setup_commands: 
  - python -m pip install gcsfs

    # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp36-cp36m-manylinux2014_x86_64.whl
    # - pip install -U "ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl"

# Custom commands that will be run on the head node after common setup.
head_setup_commands:
  - pip install google-api-python-client==1.7.8

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

# Command to start ray on the head node. You don't need to change this.
# Make sure to include the dashboard.
head_start_ray_commands:
    - ray stop
    - >-
      ulimit -n 65536;
      ray start
      --head
      --port=6379
      --object-manager-port=8076
      --autoscaling-config=~/ray_bootstrap_config.yaml
      --include-dashboard=true

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
    - ray stop
    - >-
      ulimit -n 65536;
      ray start
      --address=$RAY_HEAD_IP:6379
      --object-manager-port=8076

Issue Severity

Medium: It is a significant difficulty but I can work around it.

justinvyu commented 8 months ago

@simonsays1980 Does this error get logged happen very consistently throughout the run? I'm thinking this might be a race condition of wandb deleting some file, while that file is trying to be synced to cloud storage.

Does the run eventually succeed? The syncing should get retried throughout the run.

simonsays1980 commented 8 months ago

@simonsays1980 Does this error get logged happen very consistently throughout the run? I'm thinking this might be a race condition of wandb deleting some file, while that file is trying to be synced to cloud storage.

Does the run eventually succeed? The syncing should get retried throughout the run.

Hi @justinvyu thanks for taking a look. It does get logged consistently during the run. I later had to stop because of many _QueueActors and WandBActors started and stopped again - it looked like a mess. Regrettably I have no worker logs stored. I think it should be reproducable given the cluster YAML and the run script. The runs did not succeed because of these stopping and restarting actions. These took away the memory (156GB) and left no space for the runs - I got a OOM.

I ran another experiment then with double resources (24 -> 48 CPUs, 156GB -> 312GB, 2 T4 NVIDIA) and turned off WandB. This run did not succeed either with GPUs after ~100 iterations not anymore detected (see #43866 )