ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.48k stars 5.69k forks source link

[core] `ray.get` on a mixed list of dead and alive (but hanging) tasks does not immediately raise `RayActorError` #47204

Open hongpeng-guo opened 2 months ago

hongpeng-guo commented 2 months ago

What happened + What you expected to happen

We recently met a problem of running functions on a died actor (because the underlying node is killed). The expectation is an exception could be raised immediately. However, it seems the code just hangs. The behavior is a bit flaky, but is hangs in most cases. This problem might be a regression as the same code raise as expected in early July.

Mini repro appended below.

Versions / Dependencies

2.34

Reproduction script

import ray
from typing import Callable

@ray.remote(num_cpus=1)
class TestClass:
    def execute(self, fn: Callable[..., None]) -> None:
        return fn()

    def exit(self):
        ray.actor.exit_actor()

def dummy_func():
    print (100)

actors = [TestClass.remote() for _ in range(10)]
for actor in actors:
    actor.exit.remote()
tasks = [actor.execute.remote(dummy_func) for actor in actors]
ray.get(tasks)

Issue Severity

High: It blocks me from completing my task.

rkooo567 commented 2 months ago
import ray
from typing import Callable

@ray.remote(num_cpus=1)
class TestClass:
    def execute(self, fn: Callable[..., None]) -> None:
        return fn()

    def exit(self):
        ray.actor.exit_actor()

def dummy_func():
    import time
    print (100)
    time.sleep(100)

actors = [TestClass.remote() for _ in range(10)]
ray.get([actor.__ray_ready__.remote() for actor in actors])
refs = []
for actor in actors[:2]:
    refs.append(actor.exit.remote())
    # ray.kill(actor)
try:
    ray.get(refs)
except Exception:
    pass

tasks = [actor.execute.remote(dummy_func) for actor in actors]
ray.get(tasks)

the main issue is that if the actor is crashed, and if you do ray.get on crashed/uncrashed actors together, ray.get doesn't raise an exception until uncrashed actors are finished. We can easily get around this in train layer (by using ray.wait), and

rkooo567 commented 2 months ago

unassign myself now as it is mitigated

justinvyu commented 2 months ago

See here for a more consistent reproduction: https://github.com/anyscale/runtime/pull/929/files#diff-1913713e052df41064554b30df8e0f47abef67dee769c9de607e7728ef2e4d40R397

rkooo567 commented 2 months ago

it is not a blocker, but let's fix this soon. the semantic is very bad for fault tolerant cases

rynewang commented 1 week ago

Expected behavior: the crashed actor's any pending tasks raise ActorDiedError on ray.get(obj), or ray.get([obj, other_objs]). should not hang