ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.9k stars 5.76k forks source link

[tune] tune.run does not scale workers #14884

Closed AmeerHajAli closed 3 years ago

AmeerHajAli commented 3 years ago

I am using tune.run() on rayproject/ray-ml:nightly-gpu docker image, I call ray up config.yaml -y and then ray attach config.yaml, then python unity3d_env_local.py. The cluster never scales up workers and I get the following output:

== Status ==
Memory usage on this node: 2.1/59.9 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/4 CPUs, 0/1 GPUs, 0.0/35.54 GiB heap, 0.0/17.77 GiB objects (0.0/1.0 accelerator_type:K80)
Result logdir: /home/ray/ray_results/PPO
Number of trials: 1/1 (1 PENDING)
+-------------------------+----------+-------+
| Trial name              | status   | loc   |
|-------------------------+----------+-------|
| PPO_unity3d_ddfdf_00000 | PENDING  |       |
+-------------------------+----------+-------+

my code (inside unity3d_env_local.py file). I :

"""
Example of running an RLlib Trainer against a locally running Unity3D editor
instance (available as Unity3DEnv inside RLlib).
For a distributed cloud setup example with Unity,
see `examples/serving/unity3d_[server|client].py`
To run this script against a local Unity3D engine:
1) Install Unity3D and `pip install mlagents`.
2) Open the Unity3D Editor and load an example scene from the following
   ml-agents pip package location:
   `.../ml-agents/Project/Assets/ML-Agents/Examples/`
   This script supports the `3DBall`, `3DBallHard`, `SoccerStrikersVsGoalie`,
    `Tennis`, and `Walker` examples.
   Specify the game you chose on your command line via e.g. `--env 3DBall`.
   Feel free to add more supported examples here.
3) Then run this script (you will have to press Play in your Unity editor
   at some point to start the game and the learning process):
$ python unity3d_env_local.py --env 3DBall --stop-reward [..] [--torch]?
"""

import argparse
import os

import ray
from ray import tune
from ray.rllib.env.wrappers.unity3d_env import Unity3DEnv
from ray.rllib.utils.test_utils import check_learning_achieved

parser = argparse.ArgumentParser()
parser.add_argument(
    "--env",
    type=str,
    default="Tennis",
    choices=[
        "3DBall",
        "3DBallHard",
        "GridFoodCollector",
        "Pyramids",
        "SoccerStrikersVsGoalie",
        "Sorter",
        "Tennis",
        "VisualHallway",
        "Walker",
    ],
    help="The name of the Env to run in the Unity3D editor: `3DBall(Hard)?|"
    "Pyramids|GridFoodCollector|SoccerStrikersVsGoalie|Sorter|Tennis|"
    "VisualHallway|Walker` (feel free to add more and PR!)")
parser.add_argument(
    "--file-name",
    type=str,
    default="~/tennis/tennis_linux.x86_64",
    help="The Unity3d binary (compiled) game, e.g. "
    "'/home/ubuntu/soccer_strikers_vs_goalie_linux.x86_64'. Use `None` for "
    "a currently running Unity3D editor.")
parser.add_argument(
    "--from-checkpoint",
    type=str,
    default=None,
    help="Full path to a checkpoint file for restoring a previously saved "
    "Trainer state.")
parser.add_argument("--num-workers", type=int, default=0)
parser.add_argument("--as-test", action="store_true")
parser.add_argument("--stop-iters", type=int, default=9999)
parser.add_argument("--stop-reward", type=float, default=9999.0)
parser.add_argument("--stop-timesteps", type=int, default=10000000)
parser.add_argument(
    "--horizon",
    type=int,
    default=3000,
    help="The max. number of `step()`s for any episode (per agent) before "
    "it'll be reset again automatically.")
parser.add_argument("--torch", action="store_true")

if __name__ == "__main__":
    ray.init()

    args = parser.parse_args()

    tune.register_env(
        "unity3d",
        lambda c: Unity3DEnv(
            file_name=c["file_name"],
            no_graphics=(args.env != "VisualHallway" and
                         c["file_name"] is not None),
            episode_horizon=c["episode_horizon"],
        ))

    # Get policies (different agent types; "behaviors" in MLAgents) and
    # the mappings from individual agents to Policies.
    policies, policy_mapping_fn = \
        Unity3DEnv.get_policy_configs_for_game(args.env)

    config = {
        "env": "unity3d",
        "env_config": {
            "file_name": args.file_name,
            "episode_horizon": args.horizon,
        },
        # For running in editor, force to use just one Worker (we only have
        # one Unity running)!
        "num_workers": 100,#args.num_workers if args.file_name else 0,
        # Other settings.
        "lr": 0.0003,
        "lambda": 0.95,
        "gamma": 0.99,
        "sgd_minibatch_size": 256,
        "train_batch_size": 4000,
        # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
        "num_gpus": 1, #int(os.environ.get("RLLIB_NUM_GPUS", "0")),
        "num_sgd_iter": 20,
        "rollout_fragment_length": 200,
        "clip_param": 0.2,
        # Multi-agent setup for the particular env.
        "multiagent": {
            "policies": policies,
            "policy_mapping_fn": policy_mapping_fn,
        },
        "model": {
            "fcnet_hiddens": [512, 512],
        },
        "framework": "torch", #"tf" if args.env != "Pyramids" else "torch",
        "no_done_at_end": True,
    }
    # Switch on Curiosity based exploration for Pyramids env
    # (not solvable otherwise).
    if args.env == "Pyramids":
        config["exploration_config"] = {
            "type": "Curiosity",
            "eta": 0.1,
            "lr": 0.001,
            # No actual feature net: map directly from observations to feature
            # vector (linearly).
            "feature_net_config": {
                "fcnet_hiddens": [],
                "fcnet_activation": "relu",
            },
            "sub_exploration": {
                "type": "StochasticSampling",
            },
            "forward_net_activation": "relu",
            "inverse_net_activation": "relu",
        }
    elif args.env == "GridFoodCollector":
        config["model"] = {
            "conv_filters": [[16, [4, 4], 2], [32, [4, 4], 2],
                             [256, [10, 10], 1]],
        }
    elif args.env == "Sorter":
        config["model"]["use_attention"] = True

    stop = {
        "training_iteration": args.stop_iters,
        "timesteps_total": args.stop_timesteps,
        "episode_reward_mean": args.stop_reward,
    }

    # Run the experiment.
    results = tune.run(
        "PPO",
        config=config,
        stop=stop,
        verbose=3,
        checkpoint_freq=5,
        checkpoint_at_end=True,
        restore=args.from_checkpoint)

    # And check the results.
    if args.as_test:
        check_learning_achieved(results, args.stop_reward)

    ray.shutdown()

my cluster yaml:

# An unique identifier for the head node and workers of this cluster.
cluster_name: ameer_rllib_default

# The maximum number of workers nodes to launch in addition to the head
# node.
max_workers: 100

# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 100.0

# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
docker:
    image: "rayproject/ray-ml:nightly-gpu" # You can change this to latest-cpu if you don't need GPU support and want a faster startup
    # image: rayproject/ray:latest-gpu   # use this one if you don't need ML dependencies, it's faster to pull
    container_name: "ray_container"
    # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
    # if no cached version is present.
    pull_before_run: True
    run_options: []  # Extra options to pass into "docker run"

    # Example of running a GPU head with CPU workers
    # head_image: "rayproject/ray-ml:latest-gpu"
    # Allow Ray to automatically detect GPUs

    # worker_image: "rayproject/ray-ml:latest-cpu"
    # worker_run_options: []

# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5

# Cloud-provider specific configuration.
provider:
    type: aws
    region: us-west-2
    # Availability zone(s), comma-separated, that nodes may be launched in.
    # Nodes are currently spread between zones by a round-robin approach,
    # however this implementation detail should not be relied upon.
    availability_zone: us-west-2a,us-west-2b
    # Whether to allow node reuse. If set to False, nodes will be terminated
    # instead of stopped.
    cache_stopped_nodes: True # If not present, the default is True.

# How Ray will authenticate with newly launched nodes.
auth:
    ssh_user: ubuntu
# By default Ray creates a new private keypair, but you can also use your own.
# If you do so, make sure to also set "KeyName" in the head and worker node
# configurations below.
#    ssh_private_key: /path/to/your/key.pem

# Tell the autoscaler the allowed node types and the resources they provide.
# The key is the name of the node type, which is just for debugging purposes.
# The node config specifies the launch config and physical instance type.
available_node_types:
    ray.head.default:
        # The minimum number of worker nodes of this type to launch.
        # This number should be >= 0.
        min_workers: 0
        # The maximum number of worker nodes of this type to launch.
        # This takes precedence over min_workers.
        max_workers: 0
        # The node type's CPU and GPU resources are auto-detected based on AWS instance type.
        # If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler.
        # You can also set custom resources.
        # For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set
        # resources: {"CPU": 1, "GPU": 1, "custom": 5}
        resources: {}
        # Provider-specific config for this node type, e.g. instance type. By default
        # Ray will auto-configure unspecified fields such as SubnetId and KeyName.
        # For more documentation on available fields, see:
        # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
        node_config:
            InstanceType: p2.xlarge
            ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30
            # You can provision additional disk space with a conf as follows
            BlockDeviceMappings:
                - DeviceName: /dev/sda1
                  Ebs:
                      VolumeSize: 100
            # Additional options in the boto docs.
    ray.worker.default:
        # The minimum number of worker nodes of this type to launch.
        # This number should be >= 0.
        min_workers: 0
        # The maximum number of worker nodes of this type to launch.
        # This takes precedence over min_workers.
        max_workers: 100
        # The node type's CPU and GPU resources are auto-detected based on AWS instance type.
        # If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler.
        # You can also set custom resources.
        # For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set
        # resources: {"CPU": 1, "GPU": 1, "custom": 5}
        resources: {}
        # Provider-specific config for this node type, e.g. instance type. By default
        # Ray will auto-configure unspecified fields such as SubnetId and KeyName.
        # For more documentation on available fields, see:
        # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
        node_config:
            InstanceType: m5.large
            ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30
            # Run workers on spot by default. Comment this out to use on-demand.
            InstanceMarketOptions:
                MarketType: spot
                # Additional options can be found in the boto docs, e.g.
                #   SpotOptions:
                #       MaxPrice: MAX_HOURLY_PRICE
            # Additional options in the boto docs.

# Specify the node type of the head node (as configured above).
head_node_type: ray.head.default

# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts:
    "~/tennis": "./tennis"
    "~/unity3d_env_local.py": "./unity3d_env_local.py"

# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []

# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False

# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude:
    - "**/.git"
    - "**/.git/**"

# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
rsync_filter:
    - ".gitignore"

# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands: []

# List of shell commands to run to set up nodes.
setup_commands: []
    # Note: if you're developing Ray, you probably want to create a Docker image that
    # has your Ray repo pre-cloned. Then, you can replace the pip installs
    # below with a git checkout <your_sha> (and possibly a recompile).
    # Uncomment the following line if you want to run the nightly version of ray (as opposed to the latest)
    # - pip install -U "ray[full] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl"

# Custom commands that will be run on the head node after common setup.
head_setup_commands: []

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
    - ray stop
    - ulimit -n 65536; ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
    - ray stop
    - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076

head_node: {}
worker_nodes: {}

the file mounts includes the unity3d_env_local.py pasted above and the following file unzipped inside tennis directory locally: https://drive.google.com/file/d/1qBn_T0ukNj-w6ggza_xl1mXqVhYQjHyK/view?usp=sharing

I tried to use placement groups like this and it works, so I am not sure what is going on:

# this example works (which seems to be the way tune is using placement groups
from ray.util import placement_group
import ray
ray.init(address="auto")
pg = placement_group([{"CPU": 1, "GPU": 1}] + [{"CPU": 1}] * 100, name="__test_")
ray.get(pg.ready())
# ...

We have a blogpost waiting on this working, can you please help?

AmeerHajAli commented 3 years ago

CC @sven1977 @richardliaw @javi-redondo

richardliaw commented 3 years ago

I'll mark this as a release blocker for now, though @krfricke please feel free to deprio once you've narrowed the cause!

krfricke commented 3 years ago

I'll use this issue as a log.

With a simple example, autoscaling seems to work fine:

import time

import ray
from ray import tune

def train(config):
    time.sleep(30)
    return 4

ray.init(address="auto")
tune.run(
    train,
    resources_per_trial=tune.PlacementGroupFactory([{"CPU": 2}] * 4))

Output:

(autoscaler +5s) Tip: use `ray status` to view detailed autoscaling status. To disable autoscaler event messages, you can set AUTOSCALER_EVENTS=0.  
(autoscaler +5s) Adding 3 nodes of type cpu_2_spot.    

(cluster.yaml)

cluster_name: ray-tune-autoscaling-test

max_workers: 20
upscaling_speed: 20

idle_timeout_minutes: 0

docker:
    image: rayproject/ray:nightly
    container_name: ray_container
    pull_before_run: true

provider:
    type: aws
    region: us-west-2
    availability_zone: us-west-2a
    cache_stopped_nodes: false

available_node_types:
    cpu_2_ondemand:
        node_config:
            InstanceType: m5.large
        resources: {"CPU": 2}
        min_workers: 0
        max_workers: 0
    cpu_2_spot:
        node_config:
            InstanceType: m5.large
            InstanceMarketOptions:
                MarketType: spot
        resources: {"CPU": 2}
        min_workers: 0
        max_workers: 20

auth:
    ssh_user: ubuntu

head_node_type: cpu_2_ondemand
worker_default_node_type: cpu_2_spot

file_mounts: {
  "/autoscaling": "./"
}
krfricke commented 3 years ago

Simple PPO using Cartpole works as well

import ray
from ray import tune

config = {
    "env": "CartPole-v0",
    "num_workers": 10
}

ray.init(address="auto")
tune.run(
    "PPO",
    stop={"training_iteration": 10_000},
    config=config)
(autoscaler +3s) Tip: use `ray status` to view detailed autoscaling status. To disable autoscaler event messages, you can set AUTOSCALER_EVENTS=0.
(autoscaler +3s) Adding 5 nodes of type cpu_2_spot.
krfricke commented 3 years ago

So the culprit was actually just the ray.init(), which did not connect to the existing ray instance, but started a new (non-autoscaling) cluster instead.

Changing this to ray.init(address="auto") solves the problem:

(autoscaler +8s) Tip: use `ray status` to view detailed autoscaling status. To disable autoscaler event messages, you can set AUTOSCALER_EVENTS=0.  │
(autoscaler +8s) Adding 49 nodes of type ray.worker.default.   

I'll close this issue for now. Let me know if this issue persists with the change.

By the way, shouldn't ray.init() automatically connect to an existing cluster?