ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.2k stars 5.81k forks source link

[core] ray.kill doesn't guarantee resources are cleaned up #34917

Open d4l3k opened 1 year ago

d4l3k commented 1 year ago

What happened + What you expected to happen

We recently ran into an issue where Ray actors weren't being cleaned up in time leading to CPU OOMs when we retried those actors.

The root of the problem seems to be that ray.kill doesn't wait for the actor process to be cleaned up before returning. There's currently no way to detect when the process has been cleaned up (i.e. wait sys call).

Some projects trying to detect this with an extra remote call and from my testing it's better but still races. Ex: https://github.com/mars-project/mars/blob/19aa2d1b04d4db88e947c79096102bb615f8a13c/mars/oscar/backends/ray/utils.py#L159-L161

Versions / Dependencies

ray[default]==1.11.0

Python 3.7

Reproduction script

for retry in range(max_retries):
    try:
        workers = []
        for worker in range(workers):
            workers.append(Worker.remote())
        ray.get(workers)

        break
    except Exception:
        for worker in workers:
            ray.kill(worker)

Issue Severity

High: It blocks me from completing my task.

d4l3k commented 1 year ago

Unit test:

def test_safe_ray_kill_trainers() -> None:
    ray.shutdown()
    ray.init()

    trainers = [
        DummyTrainer.remote(0, 0, "", "", MagicMock())  # type: ignore[attr-defined]
        for i in range(2)
    ]

    # check pids are live
    procs = [psutil.Process(ray.get(trainer.pid.remote())) for trainer in trainers]
    for proc in procs:
        assert proc.status() in (psutil.STATUS_RUNNING, psutil.STATUS_SLEEPING), proc

    # kills and waits until trainer.sleep errors
    safe_ray_kill_trainers(trainers)

    for trainer in trainers:
        with pytest.raises(ray.exceptions.RayActorError, match=r".*killed.*"):
            ray.get(trainer.sleep.remote(0))

    # check pids are dead
    for proc in procs:
        assert proc.status() in (psutil.STATUS_DEAD, psutil.STATUS_ZOMBIE), proc
rkooo567 commented 1 year ago

Ray.kill is a asynchronous API. If you'd like to make it blocking, try this as workaround

        for worker in workers:
            ray.kill(worker)
            while True:
                try:
                    # ray.kill is asynchronous, so it is possible the worker is still alive right 
                    # after calling ray.get. So we should retry until it fails.
                    ray.get(worker.__ray_ready__.remote()) 
                except ray.exceptions.RayActorError:
                    break
            time.sleep(0.01)

We will consider adding blocking=True to the ray.kill APi, but it may take some time until it is supported.

d4l3k commented 1 year ago

@rkooo567 we're doing something very similar but it still races as RayActorError is returned before the actor is completely cleaned up. There's no way to guarantee memory is freed without an explicit synchronization syscall. Things like kernel locked pages and device drivers can make cleanup take longer

rkooo567 commented 1 year ago

Hmm I see. Race is still possible, but it should be very minimal because that error occurs after we ensure we initiate a killing RPC. Is it like your application has strict resource requirement where the previous actors have to be all killed before you create new ones?