ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.05k stars 5.59k forks source link

[ADAG] ADAG doesn't support NumPy array well #46972

Open kevin85421 opened 1 month ago

kevin85421 commented 1 month ago

What happened + What you expected to happen

In the following reproduction script, it works well when we use torch tensor or integers. However, when we change torch.zeros(2, 2) into np.ones(...). It will stuck there.

Versions / Dependencies

nightly

Reproduction script

num_workers = 3
num_microbatches = 2
num_steps = 5

workers = [PipelineStage.remote(pp_rank) for pp_rank in range(num_workers)]

with InputNode() as input_batches:
    input = workers[0].read_input.bind(input_batches)
    batch1 = workers[0].forward.bind(input, 0)
    # batch1.with_type_hint(TorchTensorType())
    batch2 = workers[0].forward.bind(input, 1)
    # batch2.with_type_hint(TorchTensorType())
    batch1 = workers[1].forward.bind(batch1, 0)
    # batch1.with_type_hint(TorchTensorType())
    batch2 = workers[1].forward.bind(batch2, 1)
    # batch2.with_type_hint(TorchTensorType())
    batch1 = workers[2].forward.bind(batch1, 0)
    batch2 = workers[2].forward.bind(batch2, 1)
    dag = MultiOutputNode(
        [batch1, batch2]
    )
cdag = dag.experimental_compile()

print("Compiled dag done")
for i in range(num_steps):
    print("Processing step ", i)
    ref = cdag.execute([np.ones((2, 2)) * i for i in range(num_microbatches)])
    print(ray.get(ref))

print("Done!")

Issue Severity

None

rkooo567 commented 1 month ago

I think it is expected? you are probably not supposed to use TorchTensorType?

kevin85421 commented 1 month ago

I think there is no difference. It will still use shared memory channel under the hood, but I agree with you that this is a bit confusing. I will update the repro script.

kevin85421 commented 1 month ago

If we are using torch tensor, we need to add the type hint. If not, it will stuck there until timeout.

num_workers = 3
num_microbatches = 2
num_steps = 5

workers = [PipelineStage.remote(pp_rank) for pp_rank in range(num_workers)]

with InputNode() as input_batches:
    input = workers[0].read_input.bind(input_batches)
    batch1 = workers[0].forward.bind(input, 0)
    batch1.with_type_hint(TorchTensorType())
    batch2 = workers[0].forward.bind(input, 1)
    batch2.with_type_hint(TorchTensorType())
    batch1 = workers[1].forward.bind(batch1, 0)
    batch1.with_type_hint(TorchTensorType())
    batch2 = workers[1].forward.bind(batch2, 1)
    batch2.with_type_hint(TorchTensorType())
    batch1 = workers[2].forward.bind(batch1, 0)
    batch1.with_type_hint(TorchTensorType())
    batch2 = workers[2].forward.bind(batch2, 1)
    batch2.with_type_hint(TorchTensorType())
    dag = MultiOutputNode(
        [batch1, batch2]
    )
cdag = dag.experimental_compile()

print("Compiled dag done")
for i in range(num_steps):
    print("Processing step ", i)
    ref = cdag.execute([torch.zeros(2, 2) * i for i in range(num_microbatches)])
    print(ray.get(ref))

print("Done!")
stephanie-wang commented 1 month ago

It is expected right now to hang if the previous DAG's results are zero-copy deserialized and still in scope. This is #46055.

But this script looks like it may have another bug too; print(ray.get(ref)) should release the previous DAG's results, so the buffer should be available for the next results.

rkooo567 commented 1 month ago

I will take a look in a few days!