rapidsai / dask-cuda

Utilities for Dask and CUDA interactions
https://docs.rapids.ai/api/dask-cuda/stable/
Apache License 2.0
285 stars 91 forks source link

Can't start LocalCUDACluster w/ ucx #1148

Closed randerzander closed 1 year ago

randerzander commented 1 year ago

With a conda environment using the latest nightlies:

mamba create -y --name pynds -c rapidsai-nightly -c conda-forge -c nvidia python=3.10 cudatoolkit=11.8 cudf=23.04 dask-cudf dask-cuda 'ucx-proc=*=gpu' ucx-py 'rust>=1.59.0' 'setuptools-rust>=1.5.2' dask/label/dev::dask-sql requests pydrive2 gspread oauth2client plotly python-graphviz graphviz bpython jupyterlab

conda list | grep rapids
cubinlinker               0.2.0           py310hf09951c_1    rapidsai-nightly
cudf                      23.04.00a       cuda_11_py310_230327_g173fde9d9a_240    rapidsai-nightly
dask-cuda                 23.06.00a       py310_230329_g9839618_4    rapidsai-nightly
dask-cudf                 23.04.00a       cuda_11_py310_230327_g173fde9d9a_240    rapidsai-nightly
libcudf                   23.04.00a       cuda11_230327_g173fde9d9a_240    rapidsai-nightly
librmm                    23.04.00a       cuda11_230321_ge8fbd06e_34    rapidsai-nightly
rmm                       23.04.00a       cuda11_py310_230321_ge8fbd06e_34    rapidsai-nightly
ucx-proc                  1.0.0                       gpu    rapidsai-nightly
ucx-py                    0.31.00a        py310_230320_gb5f4a10_13    rapidsai-nightly

conda list | grep dask
dask                      2023.3.2           pyhd8ed1ab_0    conda-forge
dask-core                 2023.3.2           pyhd8ed1ab_0    conda-forge
dask-cuda                 23.06.00a       py310_230329_g9839618_4    rapidsai-nightly
dask-cudf                 23.04.00a       cuda_11_py310_230327_g173fde9d9a_240    rapidsai-nightly
dask-sql                  2023.2.1a230328 py310_g883cc3c_43    dask/label/dev
distributed               2023.3.2           pyhd8ed1ab_0    conda-forge

conda list | grep ucx
ucx                       1.14.0               h538f049_0    conda-forge
ucx-proc                  1.0.0                       gpu    rapidsai-nightly
ucx-py                    0.31.00a        py310_230320_gb5f4a10_13    rapidsai-nightly

Repro:

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

if __name__ == "__main__":
    cluster = LocalCUDACluster(protocol="ucx")
    client = Client(cluster)
    print(client)

Trace:

<Client: 'ucx://127.0.0.1:56043' processes=8 threads=8, memory=503.75 GiB>                  2023-03-29 09:04:59,340 - distributed.core - ERROR -                                        
Traceback (most recent call last):                                                          
  File "/raid/rgelhausen/conda/envs/pynds/lib/python3.10/site-packages/distributed/comm/ucx.
py", line 349, in read
    await self.ep.recv(msg)
  File "/raid/rgelhausen/conda/envs/pynds/lib/python3.10/site-packages/ucp/core.py", line 72
5, in recv
    ret = await comm.tag_recv(self._ep, buffer, nbytes, tag, name=log)
ucp._libs.exceptions.UCXCanceled: <[Recv #006] ep: 0x7f7882f8e840, tag: 0x45a67c76a1982a39, 
nbytes: 16, type: <class 'numpy.ndarray'>>: 

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/raid/rgelhausen/conda/envs/pynds/lib/python3.10/site-packages/distributed/utils.py"
, line 752, in wrapper
    return await func(*args, **kwargs)
  File "/raid/rgelhausen/conda/envs/pynds/lib/python3.10/site-packages/distributed/comm/ucx.
py", line 366, in read
    self.abort()
  File "/raid/rgelhausen/conda/envs/pynds/lib/python3.10/site-packages/distributed/comm/ucx.
py", line 434, in abort 
    self._ep.abort()
  File "/raid/rgelhausen/conda/envs/pynds/lib/python3.10/site-packages/ucp/core.py", line 56
4, in abort
    logger.debug("Endpoint.abort(): %s" % hex(self.uid))
  File "/raid/rgelhausen/conda/envs/pynds/lib/python3.10/site-packages/ucp/core.py", line 55
0, in uid
    return self._ep.handle
  File "ucp/_libs/ucx_endpoint.pyx", line 359, in ucp._libs.ucx_api.UCXEndpoint.handle.__get
__
AssertionError

above trace repeats
randerzander commented 1 year ago

I'm seeing this on a box w/ 4 T4s, as well as a box with 8x A100s. Doesn't seem machine or card specific.

pentschev commented 1 year ago

Could you check the ucx version that gets installed as well and if besides the Python backtrace there's any segmentation fault/assertion errors/other C backtraces in any of the processes?

randerzander commented 1 year ago

Added UCX to the issue desc

pentschev commented 1 year ago

Could you then check if there are any other traces as asked above, after that could you try installing ucx=1.13.1 and rerunning?

pentschev commented 1 year ago

Actually, before downgrading to ucx=1.13.1, could you try rerunning with UCX_RNDV_SCHEME=get_zcopy?

randerzander commented 1 year ago

Here's the full trace with export UCX_RNDV_SCHEME=get_zcopy (w/ ucx 1.14.0) before running the repro script:

https://gist.github.com/randerzander/16e59b627fc8abf1efa349f819eba735

quasiben commented 1 year ago

I'm seeing the same thing with UCX 1.14 with and without UCX_RNDV_SCHEME=get_zcopy

quasiben commented 1 year ago

This is an issue at shutdown but shouldn't prevent any usage. We think the issue is a change in connection handling in distributed -- bisecting to find out

pentschev commented 1 year ago

https://github.com/dask/distributed/pull/7593 is the offending PR.

EDIT: I had pasted the wrong link before.

randerzander commented 1 year ago

This is an issue at shutdown but shouldn't prevent any usage.

Unfortunately it does impact usage. Simple things like len(ddf) work, but more complex jobs (Dask-SQL queries) do not. I'll try to boil it down if that's useful, but it seems like the problem has already been identified?

quasiben commented 1 year ago

With non-trivial failures can you locally revert https://github.com/dask/distributed/pull/7644 and see if that passes for you ?

wence- commented 1 year ago

@randerzander Can you try adding:

import weakref
weakref.finalize(lambda: None, lambda: None)

To the top of your script? Must occur before distributed has been imported

randerzander commented 1 year ago

Yes, that resolved both the clean cluster shutdown, and workload failures!

Thanks for the workaround!

wence- commented 1 year ago

@randerzander can you also check if the following works around the problem for you?

def workaround_7726():
    from distributed._concurrent_futures_thread import _python_exit
    from distributed.client import _close_global_client
    from distributed.deploy.spec import close_clusters
    import distributed
    import atexit
    import weakref

    def shutdown():
        # This mimics a function in distributed.__init__ which we can't
        # get a handle on (because it is del'd), and must be registered
        # after the other functions
        distributed._python_shutting_down = True

    # These functions must be unregistered and then re-registered
    for fn in [_python_exit, _close_global_client, close_clusters]:
        atexit.unregister(fn)
    # So that this finalizer is in an atexit hook after them
    # Note that atexit handlers are called last-in, first out.
    # See https://docs.python.org/3/library/atexit.html
    weakref.finalize(lambda: None, lambda: None)
    # And re-register them.
    for fn in [_python_exit, close_clusters, _close_global_client, shutdown]:
        atexit.register(fn)

Run this function before you boot a cluster.

quasiben commented 1 year ago

If this solution works I would be inclined to use this rather than put out patch release of distributed

pentschev commented 1 year ago

The fact that this has to run before spinning up the cluster suggests this must be implemented by the user. Am I right @wence- ? Did you have success implementing it as part of Dask-CUDA directly (without requiring any user interaction)?

wence- commented 1 year ago

I haven't yet, but we could run it, for example, in dask_cuda.__init__

wence- commented 1 year ago

This works for the minimal reproducer (not sure about cluster restarts):

from dask_cuda import LocalCUDACluster
from distributed import Client

if __name__ == "__main__":
    cluster = LocalCUDACluster(protocol="ucx")
    client = Client(cluster)
    del cluster
    print("shutting down...")
    print(client)
diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py
index 656f614..4781048 100644
--- a/dask_cuda/local_cuda_cluster.py
+++ b/dask_cuda/local_cuda_cluster.py
@@ -23,6 +23,35 @@ from .utils import (
 )

+def workaround_distributed_7726():
+    import atexit
+    import weakref
+
+    import distributed
+    from distributed._concurrent_futures_thread import _python_exit
+    from distributed.client import _close_global_client
+    from distributed.deploy.spec import close_clusters
+
+    def shutdown():
+        # This mimics a function in distributed.__init__ which we can't
+        # get a handle on (because it is del'd), and must be registered
+        # after the other functions
+        distributed._python_shutting_down = True
+
+    # These functions must be unregistered and then re-registered
+    for fn in [_python_exit, _close_global_client, close_clusters]:
+        atexit.unregister(fn)
+    # So that this finalizer is in an atexit hook after them
+    weakref.finalize(lambda: None, lambda: None)
+    # And re-register them.
+    for fn in [_python_exit, close_clusters, _close_global_client, shutdown]:
+        atexit.register(fn)
+
+
+workaround_distributed_7726()
+del workaround_distributed_7726
+
+
 class LoggedWorker(Worker):
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
pentschev commented 1 year ago

I can confirm that. The more fragile part seems to be the client must import dask_cuda, it seems that there's where the problem lies. The consequence of that is the dask cuda worker CLI will not suffice, but the client code must import dask_cuda even if it's not used by the client code.

EDIT: That's what I tested:

repro.py ```python from dask.distributed import Client, LocalCluster import dask_cuda if __name__ == "__main__": client = Client("ucx://10.33.227.163:8786") print(client) ```
shell ``` $ dask scheduler --protocol ucx ... $ dask cuda worker ucx://10.33.227.163:8786 ... $ python repro.py ```

If import dask_cuda is commented out in repro.py, the problem still occurs.

wence- commented 1 year ago

Yes, I think the client connections are the problematic ones :(

pentschev commented 1 year ago

Client is problematic in the cases we've seen. I suspect @randerzander 's case may be happening elsewhere too, but not sure either.

quasiben commented 1 year ago

This should now be resolved by the distributed 2023.3.2.1 release . Thank you @randerzander for raising!