dask / dask-jobqueue

Deploy Dask on job schedulers like PBS, SLURM, and SGE
https://jobqueue.dask.org
BSD 3-Clause "New" or "Revised" License
234 stars 143 forks source link

how to use ucx protocol for the communication between workers and schedulers #371

Open songqiqqq opened 4 years ago

songqiqqq commented 4 years ago

It seems that the dask.distributed has supported the ucx protocol for the communications between workers and schedulers, which seems to have large advantages over tcp when equipped with infiniband. How can I use that with jobqueue? It seems not a hard thing because jobqueue is based on dask.distributed. If I add --protocol ucx option for scheduler and worker command, would that be ok ?

quasiben commented 4 years ago

@songqiqqq I'm very excited to see a ucx/dask-jobqueue experiment. You are correct you will need to add the --protocol ucx but you will also need a handful of env variables to enable specific devices such as inifiniband and/or nvlink. We started to outline those configurations here:

https://ucx-py.readthedocs.io/en/latest/configuration.html

I believe for just IB you will need the following:

UCX_CUDA_IPC_CACHE=n UCX_MEMTYPE_CACHE=n UCX_TLS=rc,tcp,sockcm,cuda_copy UCX_SOCKADDR_TLS_PRIORITY=sockcm

dask-jobqueue may also been interested in building a nicer abstraction for configuring UCX. dask-cuda has been experimenting with this here: https://github.com/rapidsai/dask-cuda/blob/fbf3ef2cde8f42147945597d5bee81e6e388d5de/dask_cuda/dgx.py#L53-L61

guillaumeeb commented 4 years ago

It would be really nice if some people could dive into this. This would be really helpful for the community.

@mrocklin didn't you try things on Cheyenne?

mrocklin commented 4 years ago

I did. Things worked, but weren't yet any faster. The Dask + UCX team within RAPIDS (which @quasiben leads) is working on profiling and performance now, so hopefully we'll see some larger speedups soon.

As @quasiben states above, I did this just by adding the protocol="ucx://" keyword to the FooCluster classes.

songqiqqq commented 4 years ago

Sorry that I am messed up with lots of work at the end of the year. I would try to keep up with you and report here if any advances.

lesteve commented 4 years ago

@quasiben was looking at this at one point at the Dask workshop. @quasiben if you manage to make progress on this, let us know!

quasiben commented 4 years ago

Thanks for the ping @lesteve I pushed up some recent work here: https://github.com/dask/dask-jobqueue/pull/390. Still early days -- I probably won't be able to spend much time on it until later this month

lesteve commented 4 years ago

Thanks a lot!

This is a bit of a hack but may get you started quicker for your benchmarks, the approach I had in mind at the workshop (I did not explain as well as I could have probably):

submission-script.sh (Submission script)

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -n 1
#SBATCH --cpus-per-task=1
#SBATCH --mem=954M
#SBATCH -t 00:30:00

export UCX_CUDA_IPC_CACHE=n
export UCX_MEMTYPE_CACHE=n
export UCX_TLS=rc,tcp,sockcm,cuda_copy
export UCX_SOCKADDR_TLS_PRIORITY=sockcm
JOB_ID=${SLURM_JOB_ID%;*}

python your-dask-jobqueue-script.py

your-dask-jobqueue-script.py (Python script creating the Dask scheduler and workers)

from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(..., env_var=['all the ucx env var go here'])
...

And then you submit your submission script with sbatch:

sbatch submission-script.sh

This way the Dask scheduler will live on a cluster compute node with Infiniband.

If anything is not clear or does not quite work, let me know!

quasiben commented 4 years ago

@lesteve the following is what i've been doing (note: my nodes are currently a DGX -- 8 gpu machine). The scheduler and workers are executed together, again, because my nodes are a DGX machine and the granularity is one node

dask-scheduler.script

 #!/usr/bin/env bash

#SBATCH -J dask-scheduler
#SBATCH -n 1
#SBATCH -t 00:30:00

JOB_ID=${SLURM_JOB_ID%;*}
LOCAL_DIRECTORY=/gpfs/fs1/bzaitlen/dask-local-directory
UCX_NET_DEVICES=mlx5_0:1 DASK_RMM__POOL_SIZE=1GB DASK_UCX__ENABLE_INFINIBAND="True" DASK_UCX__ENABLE_NVLINK="True"
/gpfs/fs1/bzaitlen/miniconda3/envs/dask-jq/bin/python -m distributed.cli.dask_scheduler --protocol ucx \
        --scheduler-file $LOCAL_DIRECTORY/dask-scheduler.json &

unset UCX_NET_DEVICES DASK_RMM__POOL_SIZE DASK_UCX__ENABLE_INFINIBAND DASK_UCX__ENABLE_NVLINK
sleep 5
/gpfs/fs1/bzaitlen/miniconda3/envs/dask-jq/bin/python -m dask_cuda.dask_cuda_worker \
        --scheduler-file $LOCAL_DIRECTORY/dask-scheduler.json  \
        --rmm-pool-size=1GB --enable-nvlink --enable-tcp-over-ucx --enable-infiniband --net-devices="auto" \
        --local-directory=$LOCAL_DIRECTORY

dask-cuda-worker.script

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -N 1

JOB_ID=${SLURM_JOB_ID%;*}
export DASK_DISTRIBUTED__WORKER__MEMORY__Terminate="False"
export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="60s"
export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP="600s"
export LOCAL_DIRECTORY=/gpfs/fs1/bzaitlen/dask-local-directory
/gpfs/fs1/bzaitlen/miniconda3/envs/dask-jq/bin/python -m dask_cuda.dask_cuda_worker \
        --scheduler-file $LOCAL_DIRECTORY/dask-scheduler.json  \
        --rmm-pool-size=1GB --enable-nvlink --enable-tcp-over-ucx --enable-infiniband --net-devices="auto" \
        --local-directory=$LOCAL_DIRECTORY
andersy005 commented 3 years ago

I did. Things worked, but weren't yet any faster. The Dask + UCX team within RAPIDS (which @quasiben leads) is working on profiling and performance now, so hopefully we'll see some larger speedups soon.

As @quasiben states above, I did this just by adding the protocol="ucx://" keyword to the FooCluster classes.

@mrocklin, could you point me to the setup you used on Cheyenne/Casper? I've been trying to launch a dask cluster with ucx protocol for communication. All my attempts have failed

Running the following

cluster = PBSCluster(protocol="ucx://", env_extra=["export UCX_TLS=tcp,sockcm", 
                                                    "export UCX_SOCKADDR_TLS_PRIORITY=sockcm", 
                                                    'export UCXPY_IFNAME="ib0"'])

Results in a timeout error.

```python RuntimeError Traceback (most recent call last) in ----> 1 cluster = PBSCluster(protocol="ucx://", env_extra=["export UCX_TLS=tcp,sockcm", 2 "export UCX_SOCKADDR_TLS_PRIORITY=sockcm", 3 'export UCXPY_IFNAME="ib0"']) 4 client = Client(cluster) 5 cluster /glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/dask_jobqueue/core.py in __init__(self, n_workers, job_cls, loop, security, silence_logs, name, asynchronous, dashboard_address, host, scheduler_options, interface, protocol, config_name, **job_kwargs) 528 self._dummy_job # trigger property to ensure that the job is valid 529 --> 530 super().__init__( 531 scheduler=scheduler, 532 worker=worker, /glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/distributed/deploy/spec.py in __init__(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name, shutdown_on_close) 280 if not self.asynchronous: 281 self._loop_runner.start() --> 282 self.sync(self._start) 283 self.sync(self._correct_state) 284 /glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/distributed/deploy/cluster.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs) 186 return future 187 else: --> 188 return sync(self.loop, func, *args, **kwargs) 189 190 def _log(self, log): /glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs) 352 if error[0]: 353 typ, exc, tb = error[0] --> 354 raise exc.with_traceback(tb) 355 else: 356 return result[0] /glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/distributed/utils.py in f() 335 if callback_timeout is not None: 336 future = asyncio.wait_for(future, callback_timeout) --> 337 result[0] = yield future 338 except Exception as exc: 339 error[0] = sys.exc_info() /glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/tornado/gen.py in run(self) 760 761 try: --> 762 value = future.result() 763 except Exception: 764 exc_info = sys.exc_info() /glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/distributed/deploy/spec.py in _start(self) 319 self.status = Status.failed 320 await self._close() --> 321 raise RuntimeError(f"Cluster failed to start. {str(e)}") from e 322 323 def _correct_state(self): RuntimeError: Cluster failed to start. Timed out trying to connect to ucx://10.12.206.47:49153 after 10 s ```

I tried launching the scheduler from the command line, and I ran into a different error:

```bash (dask-gpu) bash-4.2$ dask-scheduler --protocol ucx distributed.scheduler - INFO - ----------------------------------------------- distributed.scheduler - INFO - ----------------------------------------------- distributed.scheduler - INFO - Clear task state [1621471211.546747] [casper-login1:254743:0] ucp_context.c:735 UCX WARN network device 'mlx5_0:1' is not available, please use one or more of: 'ext'(tcp), 'ib0'(tcp), 'mgt'(tcp) [1621471211.546766] [casper-login1:254743:0] ucp_context.c:1071 UCX ERROR no usable transports/devices (asked tcp,sockcm on network:mlx5_0:1 ) Traceback (most recent call last): File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/distributed/cli/dask_scheduler.py", line 208, in main loop.run_sync(run) File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/tornado/ioloop.py", line 530, in run_sync return future_cell[0].result() File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/distributed/cli/dask_scheduler.py", line 204, in run await scheduler File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/distributed/core.py", line 285, in _ await self.start() File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/distributed/scheduler.py", line 3678, in start await self.listen( File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/distributed/core.py", line 400, in listen listener = await listen( File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/distributed/comm/core.py", line 208, in _ await self.start() File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/distributed/comm/ucx.py", line 404, in start init_once() File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/distributed/comm/ucx.py", line 76, in init_once ucp.init(options=ucx_config, env_takes_precedence=True) File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/ucp/core.py", line 766, in init _ctx = ApplicationContext(options, blocking_progress_mode=blocking_progress_mode) File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/ucp/core.py", line 234, in __init__ self.context = ucx_api.UCXContext(config_dict) File "ucp/_libs/ucx_api.pyx", line 295, in ucp._libs.ucx_api.UCXContext.__init__ File "ucp/_libs/ucx_api.pyx", line 107, in ucp._libs.ucx_api.assert_ucs_status ucp.exceptions.UCXError: No such device During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/bin/dask-scheduler", line 11, in sys.exit(go()) File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/distributed/cli/dask_scheduler.py", line 217, in go main() File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/click/core.py", line 1134, in __call__ return self.main(*args, **kwargs) File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/click/core.py", line 1059, in main rv = self.invoke(ctx) File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/click/core.py", line 1401, in invoke return ctx.invoke(self.callback, **ctx.params) File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/click/core.py", line 767, in invoke return __callback(*args, **kwargs) File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/distributed/cli/dask_scheduler.py", line 212, in main logger.info("End scheduler at %r", scheduler.address) File "/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/distributed/core.py", line 359, in address raise ValueError("cannot get address of non-running Server") ValueError: cannot get address of non-running Server ```

Am I making a trivial error, or do I need to do some extra setup for things to work properly?

Ccing @quasiben in case he has some suggestions, too.

quasiben commented 3 years ago

@andersy005 can you post the version of ucx / ucx-py and what ifconfig returns on one of the compute nodes? When using IB (Infiniband) you will also need to set rc to the UCX_TLS env var

andersy005 commented 3 years ago

@quasiben,

Here are the versions I am using

# Name                    Version                   Build  Channel
ucx                       1.9.0+gcd9efd3       cuda11.0_0    rapidsai
ucx-proc                  1.0.0                       gpu    conda-forge
ucx-py                    0.19.0          py38_gcd9efd3_0    rapidsai

Here's the ifconfig output

```bash $ ifconfig -a eno2: flags=4099 mtu 1500 ether XXX.XXX.XXX.XXX txqueuelen 1000 (Ethernet) RX packets 0 bytes 0 (0.0 B) RX errors 0 dropped 0 overruns 0 frame 0 TX packets 0 bytes 0 (0.0 B) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0 ext: flags=4163 mtu 9000 inet XXX.XXX.XXX.XXX netmask 255.255.252.0 broadcast XXX.XXX.XXX.XXX inet6 XXX.XXX.XXX.XXX prefixlen 64 scopeid 0x20 ether XXX.XXX.XXX.XXX txqueuelen 1000 (Ethernet) RX packets 178172323638 bytes 1455116289228317 (1.2 PiB) RX errors 0 dropped 0 overruns 0 frame 0 TX packets 95416666188 bytes 660827149047780 (601.0 TiB) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0 ib0: flags=4163 mtu 4092 inet XXX.XXX.XXX.XXX netmask 255.255.0.0 broadcast 10.12.255.255 inet6 XXX.XXX.XXX.XXX prefixlen 64 scopeid 0x20 Infiniband hardware address can be incorrect! Please read BUGS section in ifconfig(8). infiniband XXX.XXX.XXX.XXX txqueuelen 16384 (InfiniBand) RX packets 1171145958 bytes 4376724739884 (3.9 TiB) RX errors 0 dropped 0 overruns 0 frame 0 TX packets 2545038787 bytes 10128413485725 (9.2 TiB) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0 ib1: flags=4099 mtu 4092 Infiniband hardware address can be incorrect! Please read BUGS section in ifconfig(8). infiniband XXX.XXX.XXX.XXX txqueuelen 16384 (InfiniBand) RX packets 0 bytes 0 (0.0 B) RX errors 0 dropped 0 overruns 0 frame 0 TX packets 0 bytes 0 (0.0 B) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0 lo: flags=73 mtu 65536 inet 127.0.0.1 netmask 255.0.0.0 inet6 ::1 prefixlen 128 scopeid 0x10 loop txqueuelen 1000 (Local Loopback) RX packets 248963877 bytes 7265704569957 (6.6 TiB) RX errors 0 dropped 0 overruns 0 frame 0 TX packets 248963877 bytes 7265704569957 (6.6 TiB) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0 mgt: flags=4163 mtu 1500 inet XXX.XXX.XXX.XXX netmask 255.255.0.0 broadcast XXX.XXX.XXX.XXX inet6 XXX.XXX.XXX.XXX prefixlen 64 scopeid 0x20 ether XXX.XXX.XXX.XXX txqueuelen 1000 (Ethernet) RX packets 461510746 bytes 31485088394 (29.3 GiB) RX errors 0 dropped 871 overruns 0 frame 0 TX packets 439804 bytes 70699277 (67.4 MiB) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0 ```

I noticed that in my previously failed attempt, I was on the login node. I switched to a compute node, and launching the scheduler from the command line appears to be working. One thing to note here is that scheduler's IP address that ends up being used here is the Public IP address of the compute node:

$ dask-scheduler --protocol ucx
distributed.scheduler - INFO - -----------------------------------------------
/glade/work/abanihi/opt/miniconda/envs/dask-gpu/lib/python3.8/site-packages/distributed/node.py:160: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 42672 instead
  warnings.warn(
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
[1621529384.933221] [crhtc53:187174:0]    ucp_context.c:735  UCX  WARN  network device 'mlx5_0:1' is not available, please use one or more of: 'ext'(tcp), 'ib0'(tcp), 'mgt'(tcp)
distributed.scheduler - INFO -   Scheduler at: ucx://PUBLIC_IP:8786
distributed.scheduler - INFO -   dashboard at:                    :42672

When I try to set up a cluster using dask-jobqueue on the same compute node, I get the same error as before

In [3]: cluster = PBSCluster(protocol="ucx://", env_extra=["export UCX_TLS=rc,tcp,sockcm",
   ...:                                                     "export UCX_SOCKADDR_TLS_PRIORITY=sockcm",
   ...:                                                     'export UCXPY_IFNAME="ib0"'])
    322 
    323     def _correct_state(self):

RuntimeError: Cluster failed to start. Timed out trying to connect to ucx://PRIVATE_IP:38572 after 10 s

In this case, scheduler appears to be using the PRIVATE IP address instead of the PUBLIC IP address.

pentschev commented 3 years ago

UCXPY_IFNAME should have no effect in Dask. If you want to use a specific interface you should use the --interface/interface to launch the Dask scheduler, I'm not sure how/if that's exposed in dask-jobqueue. On a UCX level, you can limit the interfaces that the process can see with UCX_NET_DEVICES, for example UCX_NET_DEVICES=ib0, but I think you would still need to pass interface to the scheduler. Also note that you do not have InfiniBand support built-in, as we don't ship conda packages with that enabled, so if that's what you're trying to test you'll have to build UCX from source.

guillaumeeb commented 3 years ago

I'm glad @andersy005 that you are trying to test this. Looks like there is some more testing and a good documentation effort to make all this work.

you should use the --interface/interface to launch the Dask scheduler, I'm not sure how/if that's exposed in dask-jobqueue Yes it is, so we should be OK on this part.

Also note that you do not have InfiniBand support built-in, as we don't ship conda packages with that enabled, so if that's what you're trying to test you'll have to build UCX from source. I think this is what we want!

ocaisa commented 3 years ago

I should add here that we also tested this a few months ago and found it to give no performance benefit (at least in our use case). We also found that it kills resilience, though this may have since changed.