ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.3k stars 5.5k forks source link

[Core] Have ActorPool.submit return a future #38061

Open Darkdragon84 opened 12 months ago

Darkdragon84 commented 12 months ago

Description

Hi! I know there is ray.util.ActorPool and the possibility to submit asynchronous jobs via ray.util.ActorPool.submit. However it doesn’t return a future and results can only be retrieved using get_next or get_next_unordered which are blocking, return the result value (no objectref), and offer no way of connecting the retrieved result to a submission call.

OTOH python’s concurrent.futures.Executor.submit returns a concurrent Future, which can be awaited in calling code. This is very useful if several concurrent processes call the same PoolExecutor, bc. each process can await its respective future separately.

With ray.util.ActorPool I don’t see a way to submit tasks to the ActorPool from several concurrent remote tasks, s.t. each calling task gets a future for the job it submitted and await it separately.

Please implement ray.util.ActorPool in such a way that ray.util.ActorPool.submit returns an ObjectRef which can be awaited in calling code with e.g. ray.get().

Use case

ray.util.ActorPool could be used as a drop in replacement for concurrent.futures.Executor. It would be very useful if several concurrent processes (e.g. ray tasks) were able to asynchronously submit to the same ActorPool, s.t. each process can await its respective future separately.

Darkdragon84 commented 12 months ago

https://discuss.ray.io/t/have-actorpool-submit-return-a-future/11624

jjyao commented 11 months ago

Hi @Darkdragon84,

Thanks for submitting this feature request. Are you willing to contribute? We can help.

PRESIDENT810 commented 11 months ago

Hi, I want to help with this issue. Can I try this one?

Darkdragon84 commented 11 months ago

Hi! Yes, I am happy to contribute! What's the best way to proceed? cheers

PRESIDENT810 commented 11 months ago

Hi! Yes, I am happy to contribute! What's the best way to proceed? cheers

I'm thinking extending a Future class (as a wrapper of ray.ObjectRef) returned when calling submit(). Calling Future.result() will behave just like get_unordered_next(), but waits for this specific future instead of any future, and pop that future if it's finished just like in get_unordered_next(). In this way users have a result() API to wait for future returned in submit(), and we can also keep the original behavior unchanged with the same APIs in ActorPool?

But while I was trying to implement it I found get_unordered_next() method has some bugs (#38607 and #38635). I'm thinking maybe I should fix those two first before implementing this Future API?

larrylian commented 11 months ago

@Darkdragon84 @PRESIDENT810 I Recently, I have been proposing a new API for batch submission of actor tasks. I believe that this can perfectly meet your requirements and significantly improve performance.

[Core] ray.util.ActorPool supports batch submission of remote actor tasks https://github.com/ray-project/ray/issues/39196

anyscalesam commented 4 months ago

@jjyao let's prioritize this later unless you tell me an internal team needs this; focus on p0/p1 internal team reported issues seems bigger bang for buck