ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.12k stars 5.79k forks source link

[DAG] Incompatible `.execute()` API for DAG and Compiled DAG #46441

Open woshiyyya opened 4 months ago

woshiyyya commented 4 months ago

What happened + What you expected to happen

The .execute() api does support list as input if we don't compile the DAG, but raised the following error if we compile it

Error Message:

Traceback (most recent call last):
  File "/home/ray/default/check_adag_api.py", line 20, in <module>
    output_dag = output_dag.experimental_compile()
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/dag/dag_node.py", line 170, in experimental_compile
    return build_compiled_dag_from_ray_dag(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/dag/compiled_dag_node.py", line 1436, in build_compiled_dag_from_ray_dag
    compiled_dag._get_or_compile()
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/dag/compiled_dag_node.py", line 869, in _get_or_compile
    task.output_channel = do_allocate_channel(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/dag/compiled_dag_node.py", line 75, in do_allocate_channel
    output_channel = typ.create_channel(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/experimental/channel/shared_memory_channel.py", line 160, in create_channel
    return CompositeChannel(writer, readers)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/experimental/channel/shared_memory_channel.py", line 580, in __init__
    remote_channel = Channel(self._writer, remote_readers)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/experimental/channel/shared_memory_channel.py", line 265, in __init__
    raise ValueError(
ValueError: All reader actors must be on the same node. Actor Actor(Worker, 44ddb4610f3edeebf51d88f512000000) is on node bf15b8e2d57e78ce12128e7a3c414e0dacf94dab0f5fb87892be8f5c while actor Actor(Worker, e254c21017a6b49754b1c49912000000) is on node 579237461d0bae4d2558398fbe524ca2437e50b7f5c5a23d95a7c221.

If I force all DAG actors on the same node, it raises another error:

Traceback (most recent call last):
  File "/home/ray/default/check_adag_api.py", line 22, in <module>
    print(ray.get(output_dag.execute([0, 1])))
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/dag/compiled_dag_node.py", line 1320, in execute
    self._check_inputs(args, kwargs)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/dag/compiled_dag_node.py", line 1343, in _check_inputs
    raise ValueError(
ValueError: dag.execute() or dag.execute_async() must be called with 2 positional args, got 1

Versions / Dependencies

nightly

Reproduction script

import ray
from ray.dag.input_node import InputNode
from ray.dag.output_node import MultiOutputNode

@ray.remote
class Worker:
    def __init__(self) -> None:
        pass

    def func(self, num):
        return num

workers = [Worker.remote() for _ in range(4)]

with InputNode() as dag:
    node_0 = workers[0].func.bind(dag[0])
    node_1 = workers[1].func.bind(dag[1])
    output_dag = MultiOutputNode([node_0, node_1])

# Uncomment the next line to reproduce
# output_dag = output_dag.experimental_compile()
print(ray.get(output_dag.execute([0, 1])))

Issue Severity

High: It blocks me from completing my task.

jackhumphries commented 4 months ago

Note that the first issue about all readers needing to be on the same node will be fixed soon (tracked in #46269).

jackhumphries commented 4 months ago

The second issue can be resolved by passing the two arguments to execute() directly rather than via a list:

print(ray.get(output_dag.execute(0, 1)))
Bye-legumes commented 4 months ago

The second issue can be resolved by passing the two arguments to execute() directly rather than via a list:

print(ray.get(output_dag.execute(0, 1)))

Thanks for you reply! But this is difficult for when there are lots of args (i.e. 100 args), right? So I think it's better that we can pass a list or even a nested list(for different DAG mabybe), right? I think my PR can solve this? Do you have other ideas?

jackhumphries commented 4 months ago

Would it be possible to pass a tuple of arguments? You can expand a tuple into an argument list like this:

t = (0, 1, 2, ...)
dag.execute(*t)