ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.89k stars 5.76k forks source link

[Core] `ray.cancel` deadlock #47649

Closed Catch-Bull closed 1 week ago

Catch-Bull commented 1 month ago

What happened + What you expected to happen

ray.cancel may lead to deadlocks, primarily because the implementation of NormalTaskSubmitter::CancelTask is too risky. It calls functions related to TaskManager (holding TaskManager::mu_) while holding NormalTaskSubmitter::mu_. In doing so, it might also hold TaskManager::object_ref_stream_ops_mu_. Meanwhile, we insert callbacks into MemoryStore through ResolveDependencies, which attempt to hold NormalTaskSubmitter::mu_. At the same time, some functions within TaskManager might call MemoryStore::Put with TaskManager::object_ref_stream_ops_mu_, ultimately leading to a deadlock.

Versions / Dependencies

ray: https://github.com/Catch-Bull/ray/tree/ray_cancel_deadlock To facilitate the reproduction of specific timing, a commit was added on ray master 1dd8d60bcbbf74b0d22ea4447a787a33817ff20b.

Reproduction script

head_port = str(6379)

75MB

object_store_size = str(75 * 1024**2)

head_cmd = [ "ray", "start", "--head", "--port", head_port, "--resources", json.dumps({"head": 10}), "--object-store-memory", object_store_size, ]

node1_cmd = [ "ray", "start", "--address", f"127.0.0.1:{head_port}", "--resources", json.dumps({"node1": 10}), "--object-store-memory", object_store_size, ]

def run_subprocess(cmd, e=None): env = dict(**os.environ) if e: env.update(e) p = subprocess.Popen( cmd, shell = False, env = env, ) stdout, stderr = p.communicate() print("stdout:", stdout) print("stderr:", stderr) return p.returncode == 0

assert run_subprocess(head_cmd)

make sure driver in head

ray.init("auto") assert run_subprocess(node1_cmd)

@ray.remote def foo(): return os.getppid()

def get_raylet_pid_by_resources(resources_name): return ray.get(foo.options(resources={resources_name: 0.1}).remote())

raylet1_pid = get_raylet_pid_by_resources("node1")

@ray.remote(resources={"node1": 0.1}, max_retries=-1) def func(): time.sleep(10) plasma_obj = np.random.rand(1024000) yield plasma_obj

@ray.remote(resources={"node1": 0.1}) def func_out(x): return x.sum()

print("submit task...") ref = func.remote() print("try next...") ref_1 = next(ref) print("next done.")

os.kill(raylet1_pid, signal.SIGKILL)

make sure ref_1 begin reconstruct

time.sleep(20)

print("submit task...") ref_out = func_out.remote(ref_1) print("submit done.")

assert run_subprocess(node1_cmd) time.sleep(35)

print("try run ray.cancel") ray.cancel(ref) print("ray.cancel, done")


it will hang, stack:
- thread 1:
![image](https://github.com/user-attachments/assets/1b20dcbc-0978-4b31-b245-1bf392756a72)
- thread 2:
![image](https://github.com/user-attachments/assets/d6e9c7b2-f04a-4abb-97a5-e4a8c1b98c14)
`NormalTaskSubmitter::mu_` will deadlock

### Issue Severity

None
fyrestone commented 1 month ago

I have a related issue: https://github.com/ray-project/ray/issues/46157

rynewang commented 1 month ago
Thread Call Stack Holding Lock Acquiring Lock
Thread 1 TaskManager::MarkEndOfStream TaskManager ::object_ref_stream_opsmu
TaskManager::MarkTaskCanceled
NormalTaskSubmitter::CancelTask NormalTaskSubmitter::mu_
CoreWorker::CancelTask
and Thread Call Stack Holding Lock Acquiring Lock
Thread 2 lambda1 at NormalTaskSubmitter::SubmitTask (on_dependencies_resolved) NormalTaskSubmitter::mu_
lambda1 at LocalDependencyResolver::ResolveDependencies (src/ray/core_worker/transport/dependency_resolver.cc:100) LocalDependencyResolver::mu_
CoreWorkerMemoryStore::Put
TaskManager::HandleTaskReturn Excludes TaskManager::mu_
TaskManager::HandleReportGeneratorItemReturns TaskManager::object_ref_stream_opsmu

So in thread 2, CoreWorkerMemoryStore::Put invokes get callback from GetAsync call registered by LocalDependencyResolver::ResolveDependencies. I think this GetAsync should be really async - that the callback is posted to another asio task and should not hold any locks.

Regarding #47650 , I also agree we can reduce scope of mu_ in CancelTask before calling MarkTaskCanceled. However I am a bit concerned on the Releasable mutex, and especially we acquire it once and release it in a for loop.

I made a PR https://github.com/ray-project/ray/pull/47833 to put CoreWorkerMemoryStore callbacks really to async. @Catch-Bull can you talk a look and try to see if it solves this deadlock? Thanks!

rynewang commented 1 month ago

I think this should also save https://github.com/ray-project/ray/pull/47650#discussion_r1758714604