ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.22k stars 5.62k forks source link

[core][aDAG] Multi ref output doesn't work with asyncio.gather #47684

Open rkooo567 opened 1 week ago

rkooo567 commented 1 week ago

What happened + What you expected to happen

When we have multi output refs, we want to batch wait the result. For the sync case, I verified ray.get works. But for async case, asyncio.gather doesn't seem to work.

Versions / Dependencies

master

Reproduction script

import asyncio
import ray
from ray.dag import InputNode, MultiOutputNode

async def main():
    @ray.remote
    class A:
        def f(self, i):
            return i

    a = A.remote()
    b = A.remote()

    with InputNode() as inp:
        x = a.f.bind(inp)
        y = b.f.bind(inp)
        dag =  MultiOutputNode([x, y])

    adag = dag.experimental_compile(enable_asyncio=True)
    refs = await adag.execute_async(1)
    outputs = []
    # works
    # for ref in refs:
    #     outputs.append(await ref)
    # doesn't work.
    outputs = await asyncio.gather(*refs)
    print(outputs)

asyncio.run(main())

Issue Severity

None

jeffreyjeffreywang commented 1 week ago

Need to support ray.wait on CompiledDAGFuture and a list of CompiledDAGFuture as well. @rkooo567, is this blocking you from adopting the new execute_async API in vLLM? I'm trying to assess the priority of this issue.

rkooo567 commented 1 week ago

In vLLM we don't need it (we only await on the first ref). It is mainly for beta release (aiming the mid Oct).

Also one quick note is that we don't support ray.wait on futures in Ray (we use asyncio.gather instead).

Also other thing to mention is that ray.wait may be more useful when you have multiple dags and wait concurrently.

jeffreyjeffreywang commented 1 week ago

Got it, will prioritize this over the deserialization issue.

stephanie-wang commented 6 days ago

Hey @jeffreyjeffreywang are you working on this issue? Is it okay if I assign you?

jeffreyjeffreywang commented 6 days ago

@stephanie-wang Yep, I'm working on this. Feel free to assign this to me.

Documenting my progress as well:

stephanie-wang commented 6 days ago

Okay awesome, thanks for the update!

For now I think it'd be good to push a small PR with the asyncio fix, and we can deal with the zero-copy issue separately. Also, you can check if the same issue appears with non-asyncio execution; if it does then it's probably a duplicate of another issue.

jeffreyjeffreywang commented 6 days ago

I'm worried about the hang issue which will always occur when passing in numpy arrays under my current implementation (with the lock). It also breaks existing tests. The hang issue is specifically for async execution. To be candid, my current implementation might not be exactly correct.

stephanie-wang commented 6 days ago

I'm worried about the hang issue which will always occur when passing in numpy arrays under my current implementation (with the lock). It also breaks existing tests. The hang issue is specifically for async execution. To be candid, my current implementation might not be exactly correct.

Ah gotcha, thanks! Sounds likely that there is a leaked reference in python then. gc.get_referrers might be useful.

jeffreyjeffreywang commented 6 days ago

Nice, thank you @stephanie-wang, will do a bit more debugging and update this thread.