ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.32k stars 5.5k forks source link

[Core] ray.util.Queue hangs when used in a concurrent.futures callback #38063

Open Darkdragon84 opened 12 months ago

Darkdragon84 commented 12 months ago

What happened + What you expected to happen

I am trying to trigger some callback function upon completion of a ray task. I therefore convert the ObjectRef returned from a actor method remote call into a concurrent.futures.Future and register a done callback via add_done_callback(). However when I use a ray.util.Queue in that callback and want to put something in the queue, it hangs when the callback is executed. The queue has no max size, it even hangs with an explicit timeout in put and with put_nowait.

I would expect that the queue works just as anywhere else when used in such a callback.

Could this be related to asyncio.queues.Queue not being thread safe?

Versions / Dependencies

Ubuntu 22.04, Python 3.10, ray 2.6.1

Reproduction script

import time
from concurrent.futures import Future
from functools import partial
from typing import Any

import ray
from ray.util.queue import Queue

def add_to_queue(queue: Queue, obj: Any, future: Future):
    print(f"adding {obj} to queue for future {future}")
    queue.put(obj, timeout=1)
    print("done")

@ray.remote
class A:
    def __init__(self, x):
        self.x = x

    def add(self, y):
        return self.x + y

def main():
    ray.init()
    queue = Queue()
    a = A.remote(2)
    done_fun = partial(add_to_queue, queue, a)
    queue.put(a)
    print(f"queue size before fetching result: {queue.qsize()}")
    future: Future = a.add.remote(3).future()
    future.add_done_callback(done_fun)
    print(f"submitted job")
    time.sleep(2)
    print(f"fetching result for {future}")
    print(f"fetched result: {future.result()}")
    print(f"queue size after fetching: {queue.qsize()}")

if __name__ == '__main__':
    main()

The output I get is

queue size before fetching result: 1
submitted job
adding Actor(A, fa990cccd1527a99761d418a01000000) to queue for future <Future at 0x7f22b159f4c0 state=finished returned int>
fetching result for <Future at 0x7f22b159f4c0 state=finished returned int>
fetched result: 5

while I would expect

queue size before fetching result: 1
submitted job
adding Actor(A, fa990cccd1527a99761d418a01000000) to queue for future <Future at 0x7f22b159f4c0 state=finished returned int>
done
fetching result for <Future at 0x7f22b159f4c0 state=finished returned int>
fetched result: 5
queue size after fetching result: 2

Issue Severity

High: It blocks me from completing my task.

jjyao commented 11 months ago

@rickyyx can you try to repro and assign a priority for this one.

rkooo567 commented 11 months ago

The Ray queue implementation is also not thread-safe. Can you check if the callback is running in a different thread?

Darkdragon84 commented 11 months ago

Hi! Sorry for the late reply. I changed the prints to logs to see the process and thread ids. I also added logging in the actor method A.add to see if the task is actually executed (which it is as printing the future suggests). Indeed it seems that the callback is running in a different thread in the main process

2023-08-31 13:01:13,606 INFO worker.py:1621 -- Started a local Ray instance.
2023-08-31 13:01:14,984|pid=70664|tid=139890904958016|main|INFO: queue size before fetching result: 1
2023-08-31 13:01:14,984|pid=70664|tid=139890904958016|main|INFO: submitted job
2023-08-31 13:01:14,986|pid=70664|tid=139890163443264|main|INFO: adding Actor(A, 03f116f29b484715b18a454b01000000) to queue for future <Future at 0x7f3add0339d0 state=finished returned int>
(A pid=71054) 2023-08-31 13:01:14,985|pid=71054|tid=139827396707392|A|INFO: adding 3 to 2
(A pid=71054) 2023-08-31 13:01:14,985|pid=71054|tid=139827396707392|A|INFO: returning 5
2023-08-31 13:01:16,985|pid=70664|tid=139890904958016|main|INFO: fetching result for <Future at 0x7f3add0339d0 state=finished returned int>
2023-08-31 13:01:16,985|pid=70664|tid=139890904958016|main|INFO: fetched result: 5

you can see that basically all logs from the main process (70664) come from the main thread (139890904958016), only the log from within the callback comes from a different thread 139890163443264. If that's the issue, how can we remedy that? Is it possible to wrap a thread-safe queue implementation in ray.util.Queue?