ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.83k stars 5.75k forks source link

[Core] The actors got distributed to just a few nodes even with spread scheduling #27577

Open jianoaix opened 2 years ago

jianoaix commented 2 years ago

Run this on master, in a cluster with 20 nodes:

import ray
import time

@ray.remote
class TestConsumingActor:
    def __init__(self, rank):
        self._rank = rank

    def consume(self, split):
        pass

num_workers = 20
splits = list(range(num_workers))

consumers = [
    TestConsumingActor.options(scheduling_strategy="SPREAD").remote(i)
    for i in range(num_workers)
]

future = [consumers[i].consume.remote(s) for i, s in enumerate(splits)]
ray.get(future)

can see the distribution of actors got concentrated on a few nodes (3 nodes each with 5 actors in this case -- sometimes it's even more concentrated than this):

Screen Shot 2022-08-05 at 1 35 14 PM

Note: if we add cpu requirement to actor, it worked.

@ray.remote(num_cpus=1)
class TestConsumingActor:
    def __init__(self, rank):
        self._rank = rank

    def consume(self, split):
        pass

@scv119

wuisawesome commented 2 years ago

I was looking through a lot of this code last night and I think there's a bug in our resource accounting logic and this is actually the same underlying issue as https://github.com/ray-project/ray/issues/26751.

In particular I think this needs to do PlacementResources if we're scheduling an actor. https://github.com/ray-project/ray/blob/master/src/ray/raylet/scheduling/cluster_task_manager.cc#L327