ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.13k stars 5.79k forks source link

[Core] Pending node assignment for an actor that is assigned to an existing placement group #42683

Open justinvyu opened 10 months ago

justinvyu commented 10 months ago

What happened + What you expected to happen

There is a single actor that is pending node assignment that is causing the script to hang forever, even though the resources are available, and the placement group is already reserved.

Screenshot 2024-01-24 at 2 02 22 PM
Usage:
0.0/64.0 CPU
0.9999999999999999/4.0 GPU (0.9999999999999999 used of 1.9 reserved in placement groups)
121.07GiB/256.00GiB memory (121.07GiB used of 121.07GiB reserved in placement groups)
0B/73.03GiB object_store_memory
Demands:
{'GPU': 0.9}: 1+ pending tasks/actors (1+ using placement groups)

Versions / Dependencies

2.9.1

Reproduction script

Compute config:

import ray
ray.init()

num_workers = 2
pg = ray.util.placement_group([{'memory': 130e9, 'GPU': 0.1}] + [{'GPU': 0.9}] * num_workers)
# Workaround is to ask for accelerator_type on the 0th bundle instead of GPU
# pg = ray.util.placement_group([{'memory': 130e9, 'accelerator_type:A10G': 0.1}] + [{'GPU': 0.9}] * num_workers)
ray.get(pg.ready())

# Also uncomment below for the workaround.
# @ray.remote(num_cpus=0, resources={'accelerator_type:A10G': 0.1}, memory=130e9)
@ray.remote(num_cpus=0, num_gpus=0.1, memory=130e9)
class Coordinator:
    def run(self):
        print("coordinator hello")

@ray.remote(num_cpus=0, num_gpus=0.9)
class Worker:
    def run(self):
        print("worker hello")

from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

coord = Coordinator.options(scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=pg, placement_group_bundle_index=0)).remote()
workers = [
    Worker.options(
        scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=pg, placement_group_bundle_index=1 + i),
    ).remote()
    for i in range(num_workers)
]

ray.get([coord.run.remote()] + [worker.run.remote() for worker in workers])

Leads:

Issue Severity

Medium: It is a significant difficulty but I can work around it.

matthewdeng commented 10 months ago

Adding onto this, I tried a few things (e.g. replacing GPU with CPU) and I think it has to do with fractional GPUs?

Here's a repro I put together with some more debugging verbosity:

import os
os.environ["RAY_DEDUP_LOGS"] = "0"

import ray
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from ray.util import placement_group

class Worker:
    def __init__(self, index, num_gpus):
        self.index = index
        self.num_gpus = num_gpus

    def run(self):
        import os
        print(f"CUDA_VISIBLE_DEVICES: {os.environ['CUDA_VISIBLE_DEVICES']}")

    def __repr__(self):
        return str(f"Worker[index={self.index}, num_gpus={self.num_gpus}]")

def create_worker(placement_group, index, num_gpus):
    return (
        ray.remote(Worker)
        .options(
            num_cpus=0,
            num_gpus=num_gpus,
            scheduling_strategy=PlacementGroupSchedulingStrategy(
                placement_group=placement_group, placement_group_bundle_index=index
            ),
        )
        .remote(index, num_gpus)
    )

ray.init()

gpus = [0.2, 0.8, 0.8]  # Hangs
# gpus = [0.8, 0.2, 0.8] # Succeeds
# gpus = [0.8, 0.8, 0.2] # Succeeds

pg = placement_group([{"GPU": gpu} for gpu in gpus])
ray.get(pg.ready())

workers = [create_worker(pg, i, gpu) for (i, gpu) in enumerate(gpus)]
ray.get([worker.run.remote() for worker in workers])

Hangs:

(Worker[index=0, num_gpus=0.2] pid=76147) CUDA_VISIBLE_DEVICES: 1
(Worker[index=1, num_gpus=0.8] pid=76148) CUDA_VISIBLE_DEVICES: 0

Successful:

(Worker[index=0, num_gpus=0.8] pid=77638) CUDA_VISIBLE_DEVICES: 1
(Worker[index=1, num_gpus=0.2] pid=77639) CUDA_VISIBLE_DEVICES: 0
(Worker[index=2, num_gpus=0.8] pid=77640) CUDA_VISIBLE_DEVICES: 0
(Worker[index=0, num_gpus=0.8] pid=72735) CUDA_VISIBLE_DEVICES: 1
(Worker[index=1, num_gpus=0.8] pid=72736) CUDA_VISIBLE_DEVICES: 0
(Worker[index=2, num_gpus=0.2] pid=72737) CUDA_VISIBLE_DEVICES: 0

Not sure if it's pure coincidence, but in the hanging case the "smaller" Actor (num_gpus=0.2) is scheduled with CUDA_VISIBLE_DEVICE=1, but in the successful case it gets scheduled with CUDA_VISIBLE_DEVICE=0 along with the "larger" Actor(num_gpus=0.8).

anyscalesam commented 9 months ago

@rkooo567 is this related to #42821 ?