ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.02k stars 5.78k forks source link

[core][aDAG] Hang when using ray before using adag #47864

Open rkooo567 opened 1 month ago

rkooo567 commented 1 month ago

What happened + What you expected to happen

from time import perf_counter
from time import sleep
from contextlib import contextmanager
from typing import Callable

STATIC_SHAPE = False
NCCL = True

@contextmanager
def catchtime() -> Callable[[], float]:
    t1 = t2 = perf_counter() 
    yield lambda: t2 - t1
    t2 = perf_counter() 

import ray
import ray.dag
from ray.experimental.channel.torch_tensor_type import TorchTensorType

import torch

@ray.remote(num_gpus=1)
class GPUSender:
    def send(self, shape):
        return torch.rand(shape, device="cuda", dtype=torch.float32)

@ray.remote(num_gpus=1)
class GPUReceiver:
    def recv(self, tensor: torch.Tensor):
        assert tensor.device.type == "cuda"
        return tensor.shape

shape = (1000,10000)

def test_basic():
    sender = GPUSender.remote()
    receiver = GPUReceiver.remote()

    # warmup
    for _ in range(5):
        obj = sender.send.remote(shape)
        result = receiver.recv.remote(obj)
        assert ray.get(result) == shape

    with catchtime() as time:
        for _ in range(10):
            obj = sender.send.remote(shape)
            result = receiver.recv.remote(obj)
            assert ray.get(result) == shape
    print(f"Basic: {time()}")
    del sender
    del receiver

def test_dag():
    sender = GPUSender.remote()
    receiver = GPUReceiver.remote()
    with ray.dag.InputNode() as inp:
        dag = sender.send.bind(inp)
        if STATIC_SHAPE:
            assert NCCL
            dag = dag.with_type_hint(TorchTensorType(transport="nccl", _shape=shape, _dtype=torch.float32))

        else:
            if NCCL:
                dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
            else:
                dag = dag.with_type_hint(TorchTensorType())
        dag = receiver.recv.bind(dag)

    # Creates a NCCL group across the participating actors. The group is destroyed during dag.teardown().
    adag = dag.experimental_compile()
    # warmup
    for _ in range(5):
        assert ray.get(adag.execute(shape)) == shape
    # Execute the DAG. Ray aDAG will orchestrate any NCCL ops.
    with catchtime() as time:
        for _ in range(10):
            assert ray.get(adag.execute(shape)) == shape
    print(f"DAG: {time()}")

if __name__ == "__main__":
    ray.init()
    test_basic()
    test_dag()
    ray.shutdown()

This hangs on A100. If I change the order of test_basic() and test_dag(), it works

Versions / Dependencies

master

Reproduction script

n/a

Issue Severity

None

kevin85421 commented 1 month ago

simplify the reproduction:

import ray
import ray.dag
from ray.experimental.channel.torch_tensor_type import TorchTensorType

import torch

@ray.remote(num_gpus=1)
class GPUSender:
    def send(self, shape):
        return torch.rand(shape, device="cuda", dtype=torch.float32)

@ray.remote(num_gpus=1)
class GPUReceiver:
    def recv(self, tensor: torch.Tensor):
        assert tensor.device.type == "cuda"
        return tensor.shape

shape = (1000,10000)

def test_basic():
    print("Basic start")
    sender = GPUSender.remote()
    receiver = GPUReceiver.remote()

    obj = sender.send.remote(shape)
    result = receiver.recv.remote(obj)
    assert ray.get(result) == shape
    print("Basic end")

def test_dag():
    print("DAG start")
    sender = GPUSender.remote()
    receiver = GPUReceiver.remote()
    with ray.dag.InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(transport="nccl", _shape=shape, _dtype=torch.float32))
        dag = receiver.recv.bind(dag)

    # Creates a NCCL group across the participating actors. The group is destroyed during dag.teardown().
    adag = dag.experimental_compile()
    assert ray.get(adag.execute(shape)) == shape
    print("DAG end")

if __name__ == "__main__":
    ray.init()
    test_basic()
    test_dag()
    ray.shutdown()

All failures happened after "DAG end".

Updated: I synchronized the branch with the master branch, and I need to try 10s times to reproduce the segmentation fault issue.

kevin85421 commented 1 month ago

I used valgrind to profile it. SIGSEGV is from the following:

==3950== Process terminating with default action of signal 11 (SIGSEGV): dumping core
==3950==  Access not within mapped region at address 0x10
==3950==    at 0x2DD5DD: new_threadstate (in /opt/conda/envs/ray-dev/bin/python3.9)
==3950==    by 0x1AD0A9: PyGILState_Ensure.cold (in /opt/conda/envs/ray-dev/bin/python3.9)
==3950==    by 0x6726CE9: __pyx_f_3ray_7_raylet_check_signals() (in /home/ubuntu/ray/python/ray/_raylet.so)
==3950==    by 0x6682042: std::_Function_handler<ray::Status (), ray::Status (*)()>::_M_invoke(std::_Any_data const&) (in /home/ubuntu/ray/python/ray/_raylet.so)
==3950==    by 0x69A049E: ray::core::CoreWorkerMemoryStore::GetImpl(std::vector<ray::ObjectID, std::allocator<ray::ObjectID> > const&, int, long, ray::core::WorkerContext const&, bool, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > >*, bool) (in /home/ubuntu/ray/python/ray/_raylet.so)
==3950==    by 0x69A0F7F: ray::core::CoreWorkerMemoryStore::Get(std::vector<ray::ObjectID, std::allocator<ray::ObjectID> > const&, int, long, ray::core::WorkerContext const&, bool, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > >*) (in /home/ubuntu/ray/python/ray/_raylet.so)
==3950==    by 0x69A1168: ray::core::CoreWorkerMemoryStore::Get(absl::lts_20230125::flat_hash_set<ray::ObjectID, absl::lts_20230125::hash_internal::Hash<ray::ObjectID>, std::equal_to<ray::ObjectID>, std::allocator<ray::ObjectID> > const&, long, ray::core::WorkerContext const&, absl::lts_20230125::flat_hash_map<ray::ObjectID, std::shared_ptr<ray::RayObject>, absl::lts_20230125::hash_internal::Hash<ray::ObjectID>, std::equal_to<ray::ObjectID>, std::allocator<std::pair<ray::ObjectID const, std::shared_ptr<ray::RayObject> > > >*, bool*) (in /home/ubuntu/ray/python/ray/_raylet.so)
==3950==    by 0x68AD641: ray::core::CoreWorker::GetObjects(std::vector<ray::ObjectID, std::allocator<ray::ObjectID> > const&, long, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > >&) (in /home/ubuntu/ray/python/ray/_raylet.so)
==3950==    by 0x68B5C3B: ray::core::CoreWorker::Get(std::vector<ray::ObjectID, std::allocator<ray::ObjectID> > const&, long, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > >&) (in /home/ubuntu/ray/python/ray/_raylet.so)
==3950==    by 0x67ABF6E: __pyx_pw_3ray_7_raylet_10CoreWorker_41get_objects(_object*, _object*, _object*) (in /home/ubuntu/ray/python/ray/_raylet.so)
==3950==    by 0x249344: method_vectorcall_VARARGS_KEYWORDS (in /opt/conda/envs/ray-dev/bin/python3.9)
==3950==    by 0x237605: _PyEval_EvalFrameDefault (in /opt/conda/envs/ray-dev/bin/python3.9)
kevin85421 commented 1 month ago

I checked core dump, and I found the segmentation fault is from ~RayLog(). I will chat with @jjyao tomorrow.

image
kevin85421 commented 1 month ago

Found another issue which is the same as the one valgrind found:

image

@rynewang suggests me to use Py_IsFinalizing() to check whether the interpreter is in process of being finalized.