ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.25k stars 5.49k forks source link

[RFC] Async request support in Ray Serve #32292

Open edoakes opened 1 year ago

edoakes commented 1 year ago

TL;DR - Proposal for an API to support launching expensive computations in Serve (e.g., model fine-tuning, long-running inference) using an asynchronous request API

Problem statement

With the rise of generative models, the Serve team has seen growing interest in supporting "expensive computations" in Serve. For example, users have asked to launch Stable Diffusion fine-tuning jobs and long-running inference tasks that run not for seconds, but for several minutes to an hour. These tasks are often too long to run as a stateless inference request, but too short to justify launching an entirely new Ray job / cluster.

As workarounds, users are often connecting other queueing systems to Ray, such as Celery. The purpose of this RFC is to gather feedback on APIs for handling such workloads natively in Serve without needing a full-blown queueing system.

Previous proposals

A previous RFC proposed using Ray Workflows as a wholesale replacement for queueing systems. This solution works, but is heavyweight and relies on Workflows, a relatively new library: https://github.com/ray-project/ray/issues/21161

Below are two alternate proposals with the aim to provide a simpler API.

Proposal 1 -- add async requests API to Serve

Add an "async_request" decorator to Serve deployments. For async decorated methods, Serve will generate queueing preamble/postamble logic and APIs to enable listing, resuming, and checking on the status of async requests. The API would look something as follows:

@serve.deployment
class FineTuningApp:
   @serve.async_request(
          max_queued=1000,
          max_running=10,
          priority=2,
          idempotency_key="request_id")
   def fine_tune_request(self, request_id: str, num_epochs: int, dataset: str) -> str:
          # User-implemented checkpoint and recovery.
          if checkpoint_exists(request_id):
               model, epoch = restore_from_checkpoint(request_id)
          else:
               model, epoch = new_model(), 0
          for i in range(epoch, num_epochs):
                train_one_epoch(model, dataset)
                save_checkpoint(model, i, request_id)
          return model

Here are examples of generated API methods for managing async requests:

POST /app/fine_tune_request?request_id=12345&dataset=foo1&num_epochs=5
POST /app/fine_tune_request/cancel?request_id=12345
GET /app/fine_tune_request/status?request_id=12345
GET /app/fine_tune_request/result?request_id=12345
GET /app/fine_tune_request/list?filter_status=RUNNING&limit=1000

Fault tolerance: Serve would persist the queue of requests in its coordinator actor / persistent storage. When resuming from cluster failure, Serve can load and resume previous running async requests, which can resume from any checkpoints they have taken.

Pros:

Cons:

Proposal 2 -- create a simplified TaskQueue API backed by Ray Workflows

Instead of extending Serve's API, create a separate TaskQueue API that users can use to manually create a Serve handler implementing management methods. For example, the above example could be instead implemented as:

# Define the processing function separately from the handler.
def fine_tune_request(request_id: str, num_epochs: int, dataset: str) -> str:
          # User-implemented checkpoint and recovery.
          if checkpoint_exists(request_id):
               model, epoch = restore_from_checkpoint(request_id)
          else:
               model, epoch = new_model(), 0
          for i in range(epoch, num_epochs):
                train_one_epoch(model, dataset)
                save_checkpoint(model, i, request_id)
          return model

@serve.deployment
class FineTuningApp:
   def __init__(self):
          TaskQueue.create_if_not_exists("my_queue")
          TaskQueue.resume_all("my_queue")

   def get_or_create_request(self, request_id: str, num_epochs: int, dataset: str):
          TaskQueue.submit_task("my_queue", fine_tune_request, request_id, args=[request_id, num_epochs, dataset])
          return "ok"

   def get_status(self, request_id: str):
         return TaskQueue.get_status("my_queue", request_id)

   def get_result(self, request_id: str):
         return TaskQueue.get_result("my_queue", request_id)

   def cancel_task(self, request_id: str):
         return TaskQueue.cancel("my_queue", request_id)

   def list_tasks(self):
         return TaskQueue.list("my_queue")

Fault tolerance: can be implemented in a similar way as proposal (1).

Pros:

Cons:

ahmadshadid commented 1 year ago

Greetings @edoakes, I strongly endorse Proposal 2, to elaborate, Our team is engaged in the development of a quant algo which trains and execute trades in real-time, and Proposal 2 aligns perfectly with our objectives. Upon implementation, we will be able to init fully-functional trading workflows that focuses on one single asset utilizing the ray-serve framework. whenever the market become liquid we will simply request the initialization of an additional workflow for each new stock we would like to trade ( +1000 stock ), Proposal 2 presents a more streamlined and efficient solution for our use case.

ericl commented 1 year ago

Thanks @WBORSA . Could you elaborate on the advantages / disadvantages for your use case of (2) and (1)?

richardliaw commented 1 year ago

@danielbichuetti saw that you reacted to the above post -- did you have any feedback/thoughts to share?

Ericxgao commented 1 year ago

Hello, we use Ray job submission queue quite often to spin up quick inference jobs and hack in new features. While quick, being able to persist jobs throughout Ray restarts seems very important (or rerunning failed jobs) which Serve seems to offer.

If it's possible to run separate Python scripts as a Serve job it would be great.

danielbichuetti commented 1 year ago

@richardliaw Hello!

When @edoakes mentioned Celery being used as Ray lacked its functionality, it was a valid conclusion for our company. It is difficult to compare the code quality between Ray and Celery, and while Celery is not a bad solution, Ray is far superior.

I am personally testing and using Ray Workflows in some scenarios, yet the queue concept is sometimes needed, and it can be quite laborious to establish an effective queue system that integrates well with Ray. Thus, the addition of this feature would be immensely beneficial to us, as we have occasional tasks that take anywhere between 3 and 8 hours.

IMHO, the proposal 1 would be more advantageous to both current and new users, and would reduce allocation issues. The DAG concerns, at least for us, don't apply.

richardliaw commented 1 year ago

As mentioned by @joshuaalbert, seems like https://github.com/ray-project/ray/issues/26642 is highly related as well

Joshuaalbert commented 1 year ago

Hi! I really support this! A colleague and I were the Summit 2022 and had some meetings with the team to see if they could support our use case, which turns out to be the same use case of the, now popular, generative models like ChatGPT etc.

Our team at Touch has implemented our own solution on top of Serve to solve our use, and have had 3 iterations of improvement of the system. Happy to share what we've tried and learned and strongly support any dev effort to make this a main line feature.

Our use case

Our use case is similar to the "chatbot" use case, where requests are done on behalf of a user and must be stateful. The required latency needs to be low enough that makes passing around state objects intractable overhead. Regarding async vs. sync handling, most of our things can be done synchronously, but not all. Having the option for both types of request handling while also allowing stateful routing of requests would be ideal.

Breaking down each use case

I think Serve should be able to do any combination of these two options.

  1. Statefulness -- Does the use case require a state persistent between successive requests?
  2. Async/Sync Request Handing -- Does the use case require synchronous or asynchronous request handling?

The answers above create a 2x2 matrix:

+-----------------+------------+----------------+
|        X        |  Stateful  |  Stateless     |
+-----------------+------------+----------------+
| Async. requests | Use Case A | Use Case C     |
| Sync. requests  | Use Case B | Standard Serve |
+-----------------+------------+----------------+

Examples of use cases:

We need A and B at Touch. I think most teams that need stateful Serve will need both A and B. Typically you first develop a concept with stateful synchronous response handling (B), and then only after demonstrating correctness add the complexity of stateful asynchronous handling (A).

We also use C, where we have multiple asynchronous processes that simply perform things on a scheduled basis (similar to how ServeController works) which need access to Ray. We have at least 4 such scheduled operations.

What we've learned

  1. The best approach we've found is to route requests to specific key-mapped actors, and for those actors to be special in that they have the concept of managing exclusive session objects that hold all the state per request series. Key here is to never let the exclusive nature fail, i.e. you never want a state change from one request handling to affect the state of another request handling series.
  2. Take care of session lifetime management from the start. Incorporate the notion of a timeout, after which if there has been no activity you safely terminate the session. We do this with a async "pruning" process that manages the routing table (similar to ServeController).
  3. Expect and handle zombie sessions that persist in routing table.
  4. After an actor has handled some number of request series, let it go to pasture to die peacefully once all the sessions have expired. This practice was initially conceived of by Oracle and is really valuable in maintaining health in production.
richardliaw commented 1 year ago

Miha's comment on Slack:

We had a use case that was implemented similar to Approach 1, using Serve and Core tasks.

Feedback: I prefer approach 1, because it's a much cleaner story and less exotic concept than 2.

Also, assuming requests will route through ServeController, it might be nice to be able to spin down the Serve replicas when the queue is empty, using Serve autoscaler, and spin them back up once the queue gets requests.

thatcort commented 1 year ago

A few thoughts:

Finally, @edoakes could you please elaborate on why Workflows are too heavyweight? Do you mean the API is too cumbersome or that the durability guarantees add too much processing overhead?

ahmadshadid commented 1 year ago

Miha's comment on Slack:

We had a use case that was implemented similar to Approach 1, using Serve and Core tasks. Feedback: I prefer approach 1, because it's a much cleaner story and less exotic concept than 2. Also, assuming requests will route through ServeController, it might be nice to be able to spin down the Serve replicas when the queue is empty, using Serve autoscaler, and spin them back up once the queue gets requests.

if we go with Proposal 2 wouldn't be more scalable for the longterm ? I mean going with that will allow you to even init multiple ray serve instances? correct me if I am wrong.

allemp commented 9 months ago

I'd be interested in a solution that would allow for on-demand fine-tuning models or even on-demand training small models. For example:

  1. User makes a train_model(params) request to Serve
  2. Ray starts training, validating and tuning the model
  3. User makes a predict(x) request to Serve

This would be useful when creating web applications where the user can use the UI to make small changes to the ML pipeline (selecting features, filtering training data, choosing thresholds etc.) where pre-computing each combination of metaparameters would be infeasible.

kyle-v6x commented 7 months ago

We're working on a similar system to fine-tune some small models for users. Our current solution:

  1. Run a dispatcher deployment which accepts requests to begin training.
  2. Dispatcher verifies the request and then calls a new actor with the required training code.
  3. This Actor's task is called async
  4. We return the TaskID and make calls to ray_state.get_task for scheduling status.

In actuallity, the Task is tied to an S3 object associted with the fine-tuning run which we poll for persistent status.

Roughly:

@ray.remote(
    num_gpus=1.0,
    num_cpus=4.0
)
class Trainer:
    def train(self):
        *long-running task here*
        *save to s3 etc.*

@serve.deployment()
@serve.ingress(app)
class Dispatcher:
    @app.post("/train")
    async def call_trainer(self, http_request: Request):
        body = await http_request.json()

        trainer = Trainer.remote()
        ref = trainer.train.remote()

        return str(ref.task_id())

The major issue here is the persistence is reliant on something like S3, and importantly, Serve will downscale Dispatcher replicas with active training runs (as they have already returned), resulting in Ray ending the Train call before completion. So the Dispatcher cannot be autoscaled safely.

Both suggestions above work, but I prefer the second option as se have a deployment graph for pre-processing and option 2 seems easier to work around. It's also more similar to the workflow of Ray core.

zhe-thoughts commented 7 months ago

@kyle-v6x this is a very interesting scenario. I wonder if you have 30 mins to have a Zoom call to discuss more.. If so please email zhz at anyscale.com. Thanks!

thatcort commented 7 months ago

We implemented async Serve calls using the existing Workflows API. The Workflow provides a reference value that can be shared with the caller and used to look up the state of the request. It seems to be working well. The only missing piece is rate limiting. I'm curious why the previous proposal to build on top of Workflows was abandoned?

Jainish-S commented 4 months ago

We're working on a similar system to fine-tune some small models for users. Our current solution:

  1. Run a dispatcher deployment which accepts requests to begin training.
  2. Dispatcher verifies the request and then calls a new actor with the required training code.
  3. This Actor's task is called async
  4. We return the TaskID and make calls to ray_state.get_task for scheduling status.

In actuallity, the Task is tied to an S3 object associted with the fine-tuning run which we poll for persistent status.

Roughly:

@ray.remote(
    num_gpus=1.0,
    num_cpus=4.0
)
class Trainer:
    def train(self):
        *long-running task here*
        *save to s3 etc.*

@serve.deployment()
@serve.ingress(app)
class Dispatcher:
    @app.post("/train")
    async def call_trainer(self, http_request: Request):
        body = await http_request.json()

        trainer = Trainer.remote()
        ref = trainer.train.remote()

        return str(ref.task_id())

The major issue here is the persistence is reliant on something like S3, and importantly, Serve will downscale Dispatcher replicas with active training runs (as they have already returned), resulting in Ray ending the Train call before completion. So the Dispatcher cannot be autoscaled safely.

Both suggestions above work, but I prefer the second option as se have a deployment graph for pre-processing and option 2 seems easier to work around. It's also more similar to the workflow of Ray core.

How do you handle multiple training requests?

arita37 commented 4 months ago

@kyle-v6x :

Good idea is rely on data lake or disk for persistence of tasks: Pros: No need of extra DB server...,
Cons: Latency and concurrency issues, but for long batch, it less a problem.

Think all in one solution is very difficult since requirements are very different.

kyle-v6x commented 1 month ago

We're working on a similar system to fine-tune some small models for users. Our current solution:

  1. Run a dispatcher deployment which accepts requests to begin training.
  2. Dispatcher verifies the request and then calls a new actor with the required training code.
  3. This Actor's task is called async
  4. We return the TaskID and make calls to ray_state.get_task for scheduling status.

In actuallity, the Task is tied to an S3 object associted with the fine-tuning run which we poll for persistent status. Roughly:

@ray.remote(
    num_gpus=1.0,
    num_cpus=4.0
)
class Trainer:
    def train(self):
        *long-running task here*
        *save to s3 etc.*

@serve.deployment()
@serve.ingress(app)
class Dispatcher:
    @app.post("/train")
    async def call_trainer(self, http_request: Request):
        body = await http_request.json()

        trainer = Trainer.remote()
        ref = trainer.train.remote()

        return str(ref.task_id())

The major issue here is the persistence is reliant on something like S3, and importantly, Serve will downscale Dispatcher replicas with active training runs (as they have already returned), resulting in Ray ending the Train call before completion. So the Dispatcher cannot be autoscaled safely. Both suggestions above work, but I prefer the second option as se have a deployment graph for pre-processing and option 2 seems easier to work around. It's also more similar to the workflow of Ray core.

How do you handle multiple training requests?

Sorry for the late reply.

Since we're using the Ray Cluster Launcher, new training nodes are automatically added to handle however many requests come in. If there are none available, the requests are queued in the ray task queue. Note that you still have to handle potential ray failures yourself, as the ray queue will not.