ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.22k stars 5.81k forks source link

[Core] Worker leak #30731

Open tianyicui-tsy opened 2 years ago

tianyicui-tsy commented 2 years ago

What happened + What you expected to happen

When task work depends on fail (which immediately raise an exception) and inf_loop (which runs infinite loop), canceling work doesn't recursively cancel inf_loop. Result is a worker with no way to cancel or kill. What's more, shutdown the ray job doesn't seem to stop the inf_loop worker, as a result, there's no way to stop the inf_loop worker (in practice it should be an arbitrary long-running task) and reclaim its resources. So I believe it's accurate to describe its state as leaked.

Versions / Dependencies

I tried both Ray 2.1.0 and nightly Python 3.10.6 Ubuntu 22.04

Reproduction script

If I run a ray cluster locally with ray start --head, and run the script below with RAY_ADDRESS=auto python test.py, I see that even after the script finishes (i.e. after the job shutdown), the inf_loop worker is still running and there is one less available CPU according to ray.available_resources from another job.

import ray
import time

ray.init()

def cpu():
    return ray.available_resources()['CPU']

@ray.remote
def inf_loop():
    while True:
        time.sleep(0.1)

@ray.remote
def fail():
    raise ValueError("fail")

@ray.remote
def work(*, include_fail):
    if include_fail:
        ray.get([inf_loop.remote(), fail.remote()])
    else:
        ray.get([inf_loop.remote()])

n_cpu = cpu()
assert n_cpu > 1

# when include_fail=False, cancel the work task will recursively also cancel inf_loop as expected
o = work.remote(include_fail=False)
time.sleep(1)
assert cpu() == n_cpu - 1
ray.cancel(o, force=True)
time.sleep(1)
assert cpu() == n_cpu

# when include_fail=True, cancel the work task doesn't cancel inf_loop
o = work.remote(include_fail=True)
time.sleep(1)
ray.cancel(o, force=True)
time.sleep(1)
assert cpu() == n_cpu # this fails

# even if we shutdown this job, it appears the worker that runs inf_loop is still running. So looks
# like a worker is leaked.

Issue Severity

High: It blocks me from completing my task.

tianyicui-tsy commented 1 year ago

Just saw this was pushed back to Ray 2.3. Is it possible to shed some light on the rationale behind the decision?

I'm not sure how often people run into this issue but I ran into it the first time I tried to cancel some tasks in Ray. And if my description of the bug is accurate, I'd argue the severity of the issue's result can be quite high: worker that aren't released even after the ray job stops.

Thanks and appreciate the quick triage of my issue!

rkooo567 commented 1 year ago

@jjyao will do some initial investigation

vitsai commented 1 year ago

Hi @tianyicui-tsy, we are working on several fixes for this. In the meantime, you can work around this by adding a try-except block in the fail() task that returns the exception instead of raising it.

tianyicui-tsy commented 1 year ago

Thank you @vitsai, it's great to hear that fixes are being worked on. Really appreciate that. Just want to point out that while your suggested workaround works for this simplified example, it's not really feasible in our production environment. In our production code the tasks that raise exceptions are much more complicated than fail(). It would be infeasable to change all of them by returning exceptions, for example we'd need to change all places that use the results of these tasks to firstly find out whether an exception was returned.