Open keuperj opened 3 years ago
code example:
from cuml.dask.cluster.kmeans import KMeans as cuKMeans
from cuml.dask.common import to_dask_df
from cuml.dask.datasets import make_blobs
from cuml.metrics import adjusted_rand_score
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
from dask_ml.cluster import KMeans as skKMeans
import cupy as cp
from dask.distributed import Client
#DASK setup
client = Client('SCHEDULER_IP')
#create dummy data
n_samples = 1000000
n_features = 2
n_total_partitions = len(list(client.has_what().keys()))
X_dca, Y_dca = make_blobs(n_samples,
n_features,
centers = 5,
n_parts = n_total_partitions,
cluster_std=0.1,
verbose=True)
#cp data (works!)
X_cp = X_dca.compute()
X_np = cp.asnumpy(X_cp)
del X_cp
#SK-Learn (works!)
kmeans_sk = skKMeans(init="k-means||",
n_clusters=5,
n_jobs=-1,
random_state=100)
kmeans_sk.fit(X_np)
#cuML FAILS!
kmeans_cuml = cuKMeans(init="k-means||",
n_clusters=5,
random_state=100)
kmeans_cuml.fit(X_dca)
Error:
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<timed exec> in <module>
/opt/conda/envs/rapids/lib/python3.7/site-packages/cuml/common/memory_utils.py in cupy_rmm_wrapper(*args, **kwargs)
91 def cupy_rmm_wrapper(*args, **kwargs):
92 with cupy_using_allocator(rmm.rmm_cupy_allocator):
---> 93 return func(*args, **kwargs)
94
95 # Mark the function as already wrapped
/opt/conda/envs/rapids/lib/python3.7/site-packages/cuml/dask/cluster/kmeans.py in fit(self, X)
135 # This needs to happen on the scheduler
136 comms = Comms(comms_p2p=False, client=self.client)
--> 137 comms.init(workers=data.workers)
138
139 kmeans_fit = [self.client.submit(KMeans._func_fit,
/opt/conda/envs/rapids/lib/python3.7/site-packages/cuml/raft/dask/common/comms.py in init(self, workers)
207 self.streams_per_handle,
208 workers=self.worker_addresses,
--> 209 wait=True,
210 )
211
/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in run(self, function, *args, **kwargs)
2506 >>> c.run(print_state, wait=False) # doctest: +SKIP
2507 """
-> 2508 return self.sync(self._run, function, *args, **kwargs)
2509
2510 def run_coroutine(self, function, *args, **kwargs):
/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
850 else:
851 return sync(
--> 852 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
853 )
854
/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
352 if error[0]:
353 typ, exc, tb = error[0]
--> 354 raise exc.with_traceback(tb)
355 else:
356 return result[0]
/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/utils.py in f()
335 if callback_timeout is not None:
336 future = asyncio.wait_for(future, callback_timeout)
--> 337 result[0] = yield future
338 except Exception as exc:
339 error[0] = sys.exc_info()
/opt/conda/envs/rapids/lib/python3.7/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in _run(self, function, nanny, workers, wait, *args, **kwargs)
2443 elif resp["status"] == "error":
2444 typ, exc, tb = clean_exception(**resp)
-> 2445 raise exc.with_traceback(tb)
2446 if wait:
2447 return results
/opt/conda/envs/rapids/lib/python3.7/site-packages/cuml/raft/dask/common/comms.py in _func_init_all()
427 start = time.time()
428
--> 429 _func_init_nccl(sessionId, uniqueId)
430
431 if verbose:
/opt/conda/envs/rapids/lib/python3.7/site-packages/cuml/raft/dask/common/comms.py in _func_init_nccl()
482 try:
483 n = nccl()
--> 484 n.init(nWorkers, uniqueId, wid)
485 raft_comm_state["nccl"] = n
486 except Exception as e:
cuml/raft/dask/common/nccl.pyx in cuml.raft.dask.common.nccl.nccl.init()
RuntimeError: NCCL_ERROR: b'unhandled system error'
Describe the bug I get the following error:
RuntimeError: NCCL_ERROR: b'unhandled system error'
when running the cuML examples in the container on DASK.
Steps/Code to reproduce bug
Setup of a DASK scheduler and several DASK worker nodes using the container. DASK is working in this setup (Dashboard shows the workers and the Scikit-Learn reference examples also work in this setup. Also, my setup is working for a single node/multi-GPU setting with cuML on DASK. Only the distributed GPU case fails.
Expected behavior That distributed cuML on Dask would work
Environment details (please complete the following information):
Additional context My guess would be that NCCL can't communicate through the docker container.