ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32k stars 5.45k forks source link

[RFC][Ray Core] Extend ray to support speculative execution for high-level frameworks #25182

Closed chaokunyang closed 1 year ago

chaokunyang commented 2 years ago

Description

Speculative execution is used by framework widely to detect slow tasks, and execute task duplicate speculatively. When implementing such a pattern on ray, there are some ObjectRef resolving issues. Consider the following example, which is a dag built using ray tasks.

If we found task 1 is notably slower than 2~4 homogeneous tasks, we submitted a duplicate task1. And it run faster than original task, in order to update task1 output ObjectRef to all downstream tasks, we need to iterate all downstream tasks recursively and resubmit them too:

This has two issues:

Solution 1 - Wait until all upstream tasks finish before scheduling the downstream tasks

This solution will affect pipeline scheduling. When upstream tasks are executing, ray can build task spec for all downstream tasks. But with this solution, all task specs are not built until all upstream tasks are finished.

It's also not intuitional for future-based async programing

Solution 2 - Update ray task spec dynamically

If a duplicate task is finished, we can update all input ObjectRefs in task spec for all direct downstream tasks.

The API may be like ray.update_inputs(downstream_task, [object_ref1, object_ref2, object_ref3, object_ref4, object_ref5]) or ray.update_inputs(downstream_task, {1: object_ref1}

Solution 3 - Allow task run concurrently when call remote

This is not the scene for speculative execution. For speculative execution, we don't know which tasks will be slow tasks in advance, making all tasks run multiple replicas will waste lots of resource in vain.

Solution 4 - Add duplicate API to ray task

When we find a slow task and want to duplicate it, we can just invoke ObjectRef.duplicate API to tell ray to submit the task again, the first finished output will be the final output for that task.

For me, Solution4 is most reasonable, any suggestions?

Use case

Mars graph scheduling

zhe-thoughts commented 2 years ago

Thanks @chaokunyang . Should this be a REP?

chaokunyang commented 2 years ago

Thanks @chaokunyang . Should this be a REP?

Maybe we can discuss which solution should be used, than submit a REP for that solution? Or should we move this discuss to REP? @zhe-thoughts

stale[bot] commented 1 year ago

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

You can always ask for help on our discussion forum or Ray's public slack channel.

zhe-thoughts commented 1 year ago

Hey @chaokunyang sorry I missed your question. I think we should create a REP for this cc @scv119 @jjyao

stale[bot] commented 1 year ago

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

You can always ask for help on our discussion forum or Ray's public slack channel.

stale[bot] commented 1 year ago

Hi again! The issue will be closed because there has been no more activity in the 14 days since the last message.

Please feel free to reopen or open a new issue if you'd still like it to be addressed.

Again, you can always ask for help on our discussion forum or Ray's public slack channel.

Thanks again for opening the issue!

dizhouwu commented 10 months ago

Has this been updated yet?