ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.73k stars 5.54k forks source link

[CORE][CLUSTER] Ray Autoscaler Overprovisioning Resources on AWS #46588

Open glindstr opened 1 month ago

glindstr commented 1 month ago

What happened + What you expected to happen

Scenario: AWS EC2 Cluster using Ray Autoscaler

Expected: For a 256 CPU cluster sitting idle and when 256 {CPU:1} tasks are called in quick succession I expect the cluster to meet the demand with the current resources without ray autoscaler provisioning additional resources.

What happened: Creating X {CPU:1} tasks on an idle cluster of size X CPU will cause the autoscaler to provision more CPU resources when X is sufficiently large enough. When runtime_env is specified on ray.init() the required X size seems to be smaller possibly due to the amount of time to schedule the tasks. However I've found it isn't strictly necessary. I can reliably reproduce this problem when X = 256. But I can be unreliably reproduce it with lower values like 32 or 64.

Thank you for investigating.

Versions / Dependencies

Python 3.10.12 Ray 2.32.0 Boto3

Reproduction script

Setup the venv and a test directory.

Test directory contains: setup.py run.py cluster.yaml

setup.py

import ray
from ray.autoscaler.sdk import request_resources

ray.init(address="ray://x.x.x.x:10001")
request_resources(num_cpus=256)

run.py

import time
import os
import ray

@ray.remote
def test(t):
    time.sleep(t)   

ray.init(address="ray://x.x.x.x:10001", runtime_env= {'working_dir': os.getcwd()})

inputs = [30 for _ in range(256)]
refs = [test.remote(x) for x in inputs]

results = ray.get(refs)

cluster.yaml


cluster_name: aws-test

provider:
  region: us-west-2
  type: aws
  use_internal_ips: true

auth:
  ssh_private_key: ...
  ssh_user: ubuntu

max_workers: 10
idle_timeout_minutes: 3.0

available_node_types:
    ray.head.default:
        resources:
            CPU: 0
        node_config:
          InstanceType: m7a.xlarge
          ImageId: ami-0d3b80bc1bf64610c
          KeyName: ...
          PrivateIpAddress: x.x.x.x
          SubnetIds:
          - subnet-...
    ray.worker.default:
        min_workers: 0
        max_workers: 10
        resources: {}
        node_config:
            InstanceType: m7g.metal
            ImageId: ami-0aad8abdabbe88b62
            KeyName: ...
            SubnetIds:
            - subnet-...
            - subnet-...
            - subnet-...
            - subnet-...

setup_commands:
  - python3 -m pip install -U pip & pip install -U ray[default]==2.32.0 boto3

Activate virtual environment and navigate to test directory -ray up cluster.yaml (monitor using ray status --address=...) -python setup.py Wait until all cpus are provisioned -python run.py

Issue Severity

Low: It annoys or frustrates me.

rynewang commented 1 month ago

Hi can you share more info?

  1. When you have 256 CPUs available, and 256 tasks in the running, how many extra nodes do it provision?
  2. If you have less then 256 tasks, say 128 up to 255, does it provision the extras?
  3. What's the use case behind this?
glindstr commented 1 month ago

Hi @rynewang. Thanks for looking at this. :pray:

  1. When you have 256 CPUs available, and 256 tasks in the running, how many extra nodes do it provision?

In almost every case I've seen it will spin up just 1 additional node -- the smallest node.

  1. If you have less then 256 tasks, say 128 up to 255, does it provision the extras?

I found it is possible to trigger over provision with less than capacity. I am unable to find the exact number. In my tests 230 of 256 wont trigger it but 240 of 256 can. As the margin grows the reliability of triggering the bug decreases.

  1. What's the use case behind this?

I have task runners. Each task runner needs to execute thousands of unique jobs. Often the jobs take 10s-1min. To manage resources I've created a task runner that limits the number of concurrent tasks as per the api guidance using ray.wait. So in an example with one task runner it will queue up to 256 tasks concurrently with the remainder waiting on the client or actor. As ray.wait receives finished tasks it submits more up to the 256 limit. However then the autoscaler provisions an extra machine say with 64cpu which expands the cluster to from 256 to 320cpu but the task runner continues to limit tasks to 256. So that newly provisioned machine effectively sits there idle.

Additional Notes: I thought this might have been related to having 0 cpus resources set on the head machine as configured in the cluster.yaml. However I provisions a cluster with 2 cpu on the head and the same problem persists submitting 258 jobs on 258 cpus.

The bug can be triggered without ray.wait. So that task runner code isn't causing the issue.

In the example code I provided I'm initializing with the runtime environment ray.init(address="ray://x.x.x.x:10001", runtime_env= {'working_dir': os.getcwd()}) which reliably triggers the bug every time. Initializing without runtime env ray.init(address="ray://x.x.x.x:10001") still produces the bug but unreliably. The rate at which the tasks queue is noteably slower with the runtime environment as seen watching ray status. I believe by default the ray autoscaler is set to refresh every 5 seconds. I have a hunch that with the runtime environment set it always takes longer than 5 seconds to schedule 256 tasks with this hardware. I wonder if there could be an issue related to this.

At 512 cpus when presumably the task scheduling takes longer than 10s (two autoscaler periods) it still only schedules 1 additional machine (all 64cpu machines).