ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.23k stars 5.62k forks source link

[core][aDAG] asyncio run hangs upon shutdown #47685

Open rkooo567 opened 1 week ago

rkooo567 commented 1 week ago

What happened + What you expected to happen

When I ran the script below, it hangs. I don't know the root cause yet. Probably related to shutdown

Versions / Dependencies

master

Reproduction script

import asyncio
import ray
from ray.dag import InputNode, MultiOutputNode

async def main():
    @ray.remote
    class A:
        def f(self, i):
            return i

    a = A.remote()
    b = A.remote()

    with InputNode() as inp:
        x = a.f.bind(inp)
        y = b.f.bind(inp)
        dag =  MultiOutputNode([x, y])

    adag = dag.experimental_compile(enable_asyncio=True)
    refs = await adag.execute_async(1)
    outputs = []
    # works
    for ref in refs:
        outputs.append(await ref)
    # doesn't work.
    # outputs = await asyncio.gather(*refs)
    print(outputs)

asyncio.run(main())

Issue Severity

None

jeffreyjeffreywang commented 1 week ago

It seems like the program doesn't hang anymore if adag.teardown() is called after print(outputs).

jeffreyjeffreywang commented 1 week ago

Perhaps we can tear down the aDAG implicitly when it goes out of scope which will be more intuitive to the client.

rkooo567 commented 1 week ago

actually the teardown is called on the destructor of CompiledDag class. Idk why it is not properly triggered..

rkooo567 commented 1 week ago

Potential fix here; https://github.com/ray-project/ray/issues/47685#issuecomment-2354380540 waiting for CI to see if it passes all tests