ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.25k stars 5.62k forks source link

[Serve] Add a timeout parameter for scheduling ray tasks to replicas #40995

Open kyle-v6x opened 10 months ago

kyle-v6x commented 10 months ago

Description

Currently, requests waiting to be scheduled to a replica will retry infinitely with a given backoff sequence. While we can set an HTTP timeout in the http settings, this can result in a cluster falling too far behind input requests and critically failing all requests until it scales appropriately.

It would be nice to set an additional timeout for each deployment which represents the maximum time a request can spend waiting to be sent to a deployment. We can then set this value to (HTTP_TIMEOUT - PROCESSING_TIME) and ensure that we are not wasting any time on requests that are going to timeout anyway.

I'm also curious if anyone has implimented this without a separate dispatcher deployment.

https://github.com/ray-project/ray/blob/a6b6898a5a367850b398950ada18a7913cb0c387/python/ray/serve/_private/router.py#L290

Use case

Machine learning inference server with processing time in the seconds. There is a failure mode where the load increases faster than the cold-start times can cope with, and the server get's stuck working on requests it never finishes.

anyscalesam commented 4 months ago

Chatted with @akshay-anyscale > this is a pretty good idea. @kyle-v6x would you be willing to contribute back here?

kyle-v6x commented 4 months ago

@anyscalesam Sure! My only concern is that from the 2.10 release there is a new parameter which allows us to shed the load based on a maximum replica queue size. In practice, this queue-size based approach probably works for non time-sensitive jobs, but is quite awkward for anything coming from an HTTP request. It can achieve the same thing in practice two ways, but each has a huge issue:

Currently, we use the second option with max_ongoing_requests set to some low value like 3 or 4. It's not hard to immagine, however, how this could fail given a sudden rise in requests and replicas.

Personally, I think it would be nice to add something like replica_scheduler_timeout to deployments as well. I'm actually quite curious how others use max_queued_requests in production deployments? (cc. @edoakes )

edoakes commented 4 months ago

@anyscalesam Sure! My only concern is that from the 2.10 release there is a new parameter which allows us to shed the load based on a maximum replica queue size.

I don't quite understand why this is a concern for your feature request. max_queued_requests is not really designed to solve the problem you've raised here. It's a stability failsafe to avoid overloading the scheduler: better to actively shed load than end up in a bad state timing out all requests due to excessive load.

As a side note, I would not recommend setting max_queued_requests to 1.

Adding an assignment timeout makes sense. If you're open to contributing it I'd be happy to point you in the right direction.

kyle-v6x commented 4 months ago

@edoakes Thanks for clarifying. Got a little too tunnel-visoned on our own use-case, but I see how max_queued_requests could be useful when you aren't directly controlling the timeout of requests to a backend service.

And yes totally agree regarding setting max_queued_requests to 1. Just wanted to highlight that there is currently no ideal solution, and I don't want to add a whole new deployment parameter if we can somehow fix both failure cases with one.

I'll be away for a few days but happy to work on a solution as soon as I return. Would you recommend adding it as an additional deployment parameter then?

kyle-v6x commented 4 months ago

One more note. We tried the following pattern, but I haven't dug into whether the returned reference really means that the task has been assigned to a replica. Before:

ref = await asyncio.wait_for(handle.remote(), timeout=8)

Now:

response = handle.remote()
await asyncio.wait_for(response._to_object_ref(), timeout=8)
edoakes commented 4 months ago
response = handle.remote()
await asyncio.wait_for(response._to_object_ref(), timeout=8)

I was going to suggest that you try this pattern. _to_object_ref() returning does indeed mean that the request has been scheduled.

There is one issue here though -- asyncio.wait_for will cancel the underlying task if it doesn't return within the timeout, but that cancelling _to_object_ref() does not actually cancel the underlying request. You'd need to modify this to call response.cancel() upon timeout.

How about you start with that, see if it works for your use case, then we can discuss if/how to add first class support?

kyle-v6x commented 4 months ago

@edoakes Got it. I'll start there when I return. Thanks for the input!

kyle-v6x commented 3 months ago

Finally got around to do some testing.

I tried the following:

response_handle = None
try:
    response_handle = handle.remote() # This is a batched method in practice

    start = time.time()
    response_ref = await asyncio.wait_for(response._to_object_ref(), timeout=n) # This should return once scheduled
    print(f"Scheduling time: {time.time() - start}" )

    start = time.time()
    result = await response_ref # This should return once completed
    print(f"Processing time: {time.time() - start}")

except TimeoutError as e:
    if response_handle is not None:
        response_handle.cancel()

The issues is that the _to_object_ref call seems to be waiting for the final result. This is in ray==2.23.0. This doesn't seem to be intended, perhaps a bug? I'll continue looking into it but would appreciate some advice.

Stack-Attack commented 2 months ago

Got some time to look into this today. Turns out the above strategy is broken for ray[serve]>=2.10.

Digging into it more:

Since 2.10, Python replicas always return ObjectRefGenerator rather than ObjectRef. This was seemingly to ensure that replicas could return the status of the inner-queue as the first generator return, and the unary data as the second.

In order to deal with the new Generator return, the _to_object_ref_ call was modified to conditionally retrieve the data from the second index of the generator return. This second await on __anext__() is waiting for the completion of the work, rather than the ObjectRef only. As I haven't built Ray locally yet, I haven't been able to track down the precise source.

Hopefully if I have time in the next few weeks I can build locally and get to the source of the issue. Removing the await from __anext__() works for most cases, but then the return value is not an ObjectRef, so it's not a real solution.

More notes: The ObjectRef is iterated here, and created from a nearby call.

Rejections are handled here.

Stack-Attack commented 2 months ago

Finally a script to replicate the issue:

import asyncio
from ray import serve
from ray.serve.handle import DeploymentHandle
import time
from fastapi import FastAPI

app = FastAPI()

async def _to_object_ref_or_gen(
        self,
        hold=False
):
    """
    This replicates the behaviour locally for easier debugging.
    """
    obj_ref_or_gen = await asyncio.wrap_future(self._object_ref_future)
    if hold:
        obj_ref_or_gen = await obj_ref_or_gen.__anext__() # This call blocks until the result
    else:
        obj_ref_or_gen = obj_ref_or_gen.__anext__()

    self._object_ref_or_gen = obj_ref_or_gen

    return self._object_ref_or_gen

@serve.deployment(
    ray_actor_options={"num_cpus": 0},
    num_replicas=1
)
@serve.ingress(app)
class Dispatcher:
    def __init__(self, foo_handler: DeploymentHandle):
        self.foo_handler = foo_handler

    @app.post("/")
    async def entry(self, hold: bool):
        handle = None
        metrics = {}
        try:
            start = time.time()
            handle = self.foo_handler.remote()
            metrics["call_time"], start = time.time() - start, time.time()
            ref = await asyncio.wait_for(_to_object_ref_or_gen(handle, hold), timeout=30)
            metrics["scheduling_time"], start = time.time() - start, time.time()
            result = await ref
            metrics["worker_time"] = time.time() - start
        except TimeoutError:
            if handle is not None:
                handle.cancel()
            raise TimeoutError("Scheduler timeout error. All workers seemingly full.")
        finally:
            print(f"\n\nMetrics: {metrics}\n\n")
        return result

@serve.deployment(
    ray_actor_options={"num_cpus": 0},
    num_replicas=1
)
class Foo:
    def __call__(self):
        time.sleep(10)
        return True

foo = Foo.bind()
service = Dispatcher.bind(foo_handler=foo)

if __name__ == "__main__":
    from ray.cluster_utils import Cluster

    cluster = Cluster(
        initialize_head=True,
        head_node_args={
            "num_cpus": 4,
            "num_gpus": 0,
            "resources": {"head": 1},
            "dashboard_host": "0.0.0.0",
        },
    )
    worker_node = cluster.add_node(
        num_cpus=4,
        num_gpus=2,
        resources={"worker": 1},
    )

    cluster.wait_for_nodes(2)
    deployment = serve.run(service)

    print("\n\nTesting programmatically...\n\n")
    deployment.entry.remote(hold=False).result()
    deployment.entry.remote(hold=True).result()

    input("\n\nTest script completed. Press Enter to shutdown.\n\n")
    serve.shutdown()

(ServeReplica:default:Dispatcher pid=12543) Metrics: {'call_time': 0.008048772811889648, 'scheduling_time': 0.00822901725769043, 'worker_time': 10.042553186416626} (ServeReplica:default:Dispatcher pid=12543) Metrics: {'call_time': 0.00012302398681640625, 'scheduling_time': 10.04919719696045, 'worker_time': 0.00021195411682128906}

Stack-Attack commented 2 months ago

Aha. Looks like the source of the issues might be from the ObjectRefGenerator code itself, but can't confirm untill I have time to build and test locally.

https://github.com/ray-project/ray/blob/7874da9ae009d8ed89a583bb1a1f8a73f025f03e/python/ray/_raylet.pyx#L511

(cc. @stephanie-wang)