PrefectHQ / prefect-ray

Prefect integrations with Ray
https://prefecthq.github.io/prefect-ray/
Apache License 2.0
63 stars 5 forks source link

Resolve Ray Futures Decentralized & Concurrently #69

Closed toro-berlin closed 1 year ago

toro-berlin commented 1 year ago

Closes https://github.com/PrefectHQ/prefect-ray/issues/37

Problem Statement We recently ran into a performance bottleneck with the current approach of resolving the Ray futures. We ran into these issues with a flow that uses task dependencies. Please take a look at the flow structure below.

Consulting the logs shows that the limiting factor was the networking: We saw a lot of httpx and OpenSSL errors (timeout during handshake, IOerror, ...).

Hypothesis Resolving the Ray futures in a centralized manner puts a lot of pressure on the networking stack and Prefect API. Implementing the changes proposed in https://github.com/PrefectHQ/prefect-ray/issues/37 will eliminate this bottleneck.

First Tests I conducted test runs with my minimum reproducible example (flow executed on my local machine, Ray Cluster provided by Anyscale) and real-world flow + workload (flow executed on AWS EKS flow pod runner, Ray Cluster provided by Anyscale). They show that we do not run into networking errors anymore with the changes proposed in this PR.

Example

Flow Structure

for _ in range(do_it_for_a_lot_of_input):
    a_future = a.submit(wait_for=[])
    b_future = b.submit(wait_for=[a_future])

    c_future = c.submit(wait_for=[b_future])
    d_future = d.submit(wait_for=[b_future])
    e_future = e.submit(wait_for=[b_future])

Screenshots

Checklist

zanieb commented 1 year ago

🎉 this looks like the right idea!

toro-berlin commented 1 year ago

Hey @madkinsz, hey @ahuang11,

Thank you for providing the RayTaskRunner for executing Prefect tasks on a Ray cluster. I wrote down my recent challenges with the actual implementation in the PR description. It looks like implementing the changes proposed in https://github.com/PrefectHQ/prefect-ray/issues/37 will do the trick.

Have a great start in the week Tobias

pcmoritz commented 1 year ago

@toro-berlin Thanks a lot for submitting this PR.

I don't know this code well enough yet to be able to really give useful feedback, but here are some thoughts: I wonder if it would be possible to not have to call ray.get on the future at all and let the Ray scheduler figure out the resolution -- you can pass ObjectRefs into ray.remote calls, and the Ray scheduler will figure out when the objects are available and call the task. That would be the most scalable way to handle this.

I can spend a little more time reading the code tomorrow and seeing if what I'm saying can actually be done / how this all works :)

In the meantime @madkinsz and @ahuang11 please feel free to merge the PR and if you have any documentation / pointers for me to understand this code better, that would be very helpful :)

desertaxle commented 1 year ago

Thanks for this PR @toro-berlin! I'd like to merge these changes so that we can unblock you and others that are experiencing this issue. Could you please add a short summary of your changes to the Fixed section of the change log? Once that's added and this PR is marked as ready for review, I'll approve an merge!

toro-berlin commented 1 year ago

Great to hear, @desertaxle. Added an entry to the changelog and marked this PR as read-for-review.