ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.97k stars 5.58k forks source link

[Serve] DeploymentResponse._to_object_ref() blocks untill final results from actor #46893

Open kyle-v6x opened 1 month ago

kyle-v6x commented 1 month ago

What happened + What you expected to happen

Since 2.10.0, DeploymentResponses._to_object_ref() functions await the final result from the task, rather than returning the ObjectRef of the running task once it is scheduled. This effectively prevents any Async task from being computed while previous tasks are running (i.e passing by ref).

You can find more details here.

Versions / Dependencies

This is an issue with Ray[Serve]>=2.10.0

Reproduction script

import asyncio
from ray import serve
from ray.serve.handle import DeploymentHandle
import time
from fastapi import FastAPI

app = FastAPI()

async def _to_object_ref_or_gen(
        self,
        hold=False
):
    """
    This replicates the behaviour locally for easier debugging.
    """
    obj_ref_or_gen = await asyncio.wrap_future(self._object_ref_future)
    if hold:
        obj_ref_or_gen = await obj_ref_or_gen.__anext__() # This call blocks until the result
    else:
        obj_ref_or_gen = obj_ref_or_gen.__anext__()

    self._object_ref_or_gen = obj_ref_or_gen

    return self._object_ref_or_gen

@serve.deployment(
    ray_actor_options={"num_cpus": 0},
    num_replicas=1
)
@serve.ingress(app)
class Dispatcher:
    def __init__(self, foo_handler: DeploymentHandle):
        self.foo_handler = foo_handler

    @app.post("/")
    async def entry(self, hold: bool):
        handle = None
        metrics = {}
        try:
            start = time.time()
            handle = self.foo_handler.remote()
            metrics["call_time"], start = time.time() - start, time.time()
            ref = await asyncio.wait_for(_to_object_ref_or_gen(handle, hold), timeout=30)
            metrics["scheduling_time"], start = time.time() - start, time.time()
            result = await ref
            metrics["worker_time"] = time.time() - start
        except TimeoutError:
            if handle is not None:
                handle.cancel()
            raise TimeoutError("Scheduler timeout error. All workers seemingly full.")
        finally:
            print(f"\n\nMetrics: {metrics}\n\n")
        return result

@serve.deployment(
    ray_actor_options={"num_cpus": 0},
    num_replicas=1
)
class Foo:
    def __call__(self):
        time.sleep(10)
        return True

foo = Foo.bind()
service = Dispatcher.bind(foo_handler=foo)

if __name__ == "__main__":
    from ray.cluster_utils import Cluster

    cluster = Cluster(
        initialize_head=True,
        head_node_args={
            "num_cpus": 4,
            "num_gpus": 0,
            "resources": {"head": 1},
            "dashboard_host": "0.0.0.0",
        },
    )
    worker_node = cluster.add_node(
        num_cpus=4,
        num_gpus=2,
        resources={"worker": 1},
    )

    cluster.wait_for_nodes(2)
    deployment = serve.run(service)

    print("\n\nTesting programmatically...\n\n")
    deployment.entry.remote(hold=False).result()
    deployment.entry.remote(hold=True).result()

    input("\n\nTest script completed. Press Enter to shutdown.\n\n")
    serve.shutdown()

Issue Severity

High: It blocks me from completing my task.

shrekris-anyscale commented 1 month ago

I was able to reproduce the issue too:

Repro ```python # File name: repro.py import asyncio import time from ray import serve @serve.deployment(max_ongoing_requests=5) async def sleepy(): await asyncio.sleep(3) return "Finished!" app = sleepy.bind() async def experiment(): handle = serve.run(app) print("Starting experiment.") t0 = time.perf_counter() deployment_response = handle.remote() t1 = time.perf_counter() print(f"Finished remote call: {(t1 - t0):.4f}s") t2 = time.perf_counter() assignment_ref = deployment_response._to_object_ref() t3 = time.perf_counter() print(f"Created assignment ref: {(t3 - t2):.4f}s") t4 = time.perf_counter() result_ref = await assignment_ref t5 = time.perf_counter() print(f"Awaited assignment ref: {(t5 - t4):.4f}s") t6 = time.perf_counter() result = await result_ref t7 = time.perf_counter() print(f"Awaited result ref: {(t7 - t6):.4f}s") print(f"Final results: {result}") if __name__ == "__main__": asyncio.run(experiment()) ``` ``` % python repro.py ... Starting experiment. Finished remote call: 0.0023s Created assignment ref: 0.0000s Awaited assignment ref: 3.0111s Awaited result ref: 0.0005s Final results: Finished! ```

Awaiting the assignment_ref in the repro seems to also wait for the request to finish. Instead, it should return as soon as the request has been assigned to a replica.

zcin commented 1 month ago

This effectively prevents any Async task from being computed while previous tasks are running (i.e passing by ref).

Hi @kyle-v6x can you say more about this, and what you are trying to use the _to_object_ref() API for?

kyle-v6x commented 1 month ago

@zcin In order to pass the result of a serve replica to an actor async, we need to retrieve the ObjectRef and send it to the actor, as the actor has no concept of a DeploymentResponse. Essentially the use case explained in the documentation here.

Further, if you want to run a task fully async (I know this is not supported officially yet) it makes sense to retrieve/return the task_id for tracking. Since we can't obtain the ObjectRef async, we can't get the task_id.

I'll link this one more time for visibility. I may have time next week to finally build locally and work on a solution, but I can't guaruntee!

zcin commented 1 month ago

Hi @kyle-v6x, your use case makes sense to me, and you're correct that currently that isn't supported in Serve because the second object ref won't be retrieved until the request finishes. We are currently working on a fix!

GeneDer commented 2 weeks ago

fixed by https://github.com/ray-project/ray/pull/47209

GeneDer commented 2 weeks ago

Sorry, not resolved yet. This is depended on https://github.com/ray-project/ray/issues/46934