ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.9k stars 5.76k forks source link

[RFC] distributed Python coroutines #21793

Closed mwtian closed 2 years ago

mwtian commented 2 years ago

Overview

Python coroutines are Python functions that can be suspended and resumed. They are built from Python generators, including those declared with the async / await syntax. With https://github.com/llllllllll/cloudpickle-generators for generator serialization, and Ray ObjectRef for distributed future, we can build a runtime that can execute Python coroutines across Ray nodes, potentially with checkpointing.

Screen Shot 2022-01-21 at 2 44 42 PM

In the diagram above, Ray would run the coroutine as follow: 1) The coroutine f() first yields at await load_from_s3(). Assuming this is an async function not using Ray, the output will be local so there is no right semantic to serialize and deserialize the coroutine here. Ray runtime steps the coroutine on the local asyncio event loop. 2) After above, coroutine f() next yields at await classifier_actor.remote(images). Assuming this is a Ray remote method call, the output ObjectRef can be used anywhere in the Ray cluster. Serializing the coroutine, sending the serialized data to a different node, checkpointing, and deserializing the coroutine are possible. 3) Then coroutine f() proceeds to return its result to Ray. And Ray handles the result by returning to the coroutine caller or persisting the result.

Potential Use Cases

Specifying Workflow in Python

Workflows can be implemented in a Python coroutine, instead of in a special API / DSL. A hypothetical trip booking workflow can be:

async def book_trip(request_id: str):
    undo = []
    try:
        car_reservation_id = await book_car.remote(request_id)
        undo.append(lambda : cancel(car_reservation_id))

        hotel_reservation_id = await book_hotel.remote(request_id)
        undo.append(lambda : cancel(hotel_reservation_id))

        flight_reservation_id = await book_flight.remote(request_id)
        undo.append(lambda : cancel(flight_reservation_id))
    except Exception:
        failed = True
    if failed:
        print(f"Canceling finished tasks ...")
        for callback in undo:
            await callback.remote()
        return
    print(f"Booking finished: {car_reservation_id} {hotel_reservation_id} "
          f"{flight_reservation_id}")

book_car, book_hotel and book_flight are Ray tasks defined with @ray.remote. Ray workflow runtime can turn the book_trip() coroutine above into a persisted workflow, by checkpointing the workflow at each await, so the workflow avoids duplicating successful tasks and retries failed tasks.

There are more complexities if we want to run tasks in parallel like asyncio.gather(). We may have to checkpoint at each .remote() call. This is being investigated.

Optimizing Request Processing Flow

Suppose there are Ray actors for specific tasks, e.g. english_speech_to_text, search and english_text_to_speech, and we want to combine them for a voice search feature (request is processed via english_speech_to_text -> search -> english_text_to_speech). Usually we have to make each Ray actor aware of the next Ray actor to continue request processing. We may need to add voice search specific logic into the Ray actors. This breaks encapsulation. Another alternative is to gather the result from each actor in a request handler, which may increase latency and cost in data transfer. Instead, we can describe how the request flows through the actors with a Python coroutine:

async def voice_search(question_speech):
    text = await english_speech_to_text.run.remote(question_speech)

    if "what" in text:
        answer = await question_and_answer.run.remote(text)
        return answer

    search_result = await search.run.remote(text)

    result_speech = await english_text_to_speech.run.remote(search_result)
    return result_speech

The coroutine can be developed and tested locally. It can also be executed with Ray from the request handler. Ray can suspend the coroutine at each remote call (e.g. english_speech_to_text.run.remote(question_speech)), serialize and forward the coroutine to the node which will produce the result (e.g. the node running the english_speech_to_text actor). This makes the actors' code cleaner, and reduces data movement to the same level as forwarding to specific actor with reasonable optimizations.

Status

Prototype: https://github.com/ray-project/ray/pull/21783

Know Issues

Next Steps

First we want to gather feedback from the community.

Please let us know what you think!

cc @ericl @richardliaw @iycheng @simon-mo

stale[bot] commented 2 years ago

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

You can always ask for help on our discussion forum or Ray's public slack channel.

stale[bot] commented 2 years ago

Hi again! The issue will be closed because there has been no more activity in the 14 days since the last message.

Please feel free to reopen or open a new issue if you'd still like it to be addressed.

Again, you can always ask for help on our discussion forum or Ray's public slack channel.

Thanks again for opening the issue!