dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 716 forks source link

Standard workflow for using ucx/ucx-py to speed up datatransfer between nodes/processes #3315

Open songqiqqq opened 4 years ago

songqiqqq commented 4 years ago

I got lot's of information pieces that ucx/ucx-py could speed up the data transfer between not only GPU-GPU communication but also CPU-CPU communication at different nodes/workers on the framework of dask/distributed. However, I can't get a full document about how to do this and what to be correctly configured. I tried the code as below, but no speed up could be abserved.

(1)The configure for ucxpy is :

#.bashrc   file
export UCX_MEMTYPE_CACHE=n UCX_TLS=rc,tcp,sockcm UCX_SOCKADDR_TLS_PRIORITY=sockcm
export UCXPY_IFNAME="ib0"

(2) then , i start the scheduler with:dask-scheduler --protocol ucx --interface ib0 (3) then , i start the worker with : opt/anaconda37/bin/python3.7 -m distributed.cli.dask_worker ${scheduler_address} --nthreads 15 --nprocs 1 --memory-limit 400.00GB --nanny --death-timeout 60 --local-directory /scratch/qsong --name com --interface ib0 at different nodes of the cluster, where $(scheduler_address) isucx:\\ ...... like address got from last step. Besides,the ENV value at different nodes is ensured to be the same with step 1 (4) then, in the main.py, I use: client = Client('ucx://172.16.10.12:8786') to start the calculation.

The data could be transfered from main.py to different nodes safely, but no speed-up could be observed.

Is there any suggestion ? @mrocklin

TomAugspurger commented 4 years ago

I believe a few people are currently benchmarking things and measuring where we'd expect to see more speedups, so you may not be doing anything incorrect. I don't think that's written down anywhere yet.

cc @quasiben if you have thoughts.

quasiben commented 4 years ago

Thanks @TomAugspurger for the ping. Yes, we've been chatting about this issue a bit here: https://github.com/rapidsai/ucx-py/issues/361

I think we need to spend some time better understanding IB transfers. Until recently we've been primarily focused on GPU transfers over nvlink. We'll undertake IB benchmarking soon -- though not sure how soon this will be given the coming holidays

songqiqqq commented 4 years ago

Let me quote my small experiment at rapidsai/ucx-py#361 first.

I test the time consuming of data transfering as below:

time1= time() client.scatter(data) # two workers total time2 = time() time_comsuming = time2 - time1 with data size of 100MB, it needs about 1.5s ~ 2s using ucx. This speed is almost the same as before(no ucx).

More exactly, the time consuming is a little smaller than that without uxc, which is about 2.x seconds.

Besides, another point is that when the client scattering data, packets increase could be observed with:

Every 0.1s: cat /sys/class/infiniband/mlx5_*/ports/1/counters/port_rcv_packets 

1216076820
5568826841 

Another point is that although the time interval is 0.1s and the whole scattering process last 2s, the packets number at scattering is almost a sharp jump, from start value to target value, not a continous change in the whole scattering periods. Could it be a evidence that data transfering is very short at network level and just could be blocked at somewhere else ?

@quasiben What do you think about the question where most likely the time is consumed, at dask level ? or ucx-py ? or ucx. If it's on dask/distributed, could you give me some guides on diving into this?

songqiqqq commented 4 years ago

(1) First finding ==> DataFrame is much more time consuming than numpy.array. test_dask.py:

import numpy as np
import time
import sys
import pdb
import pandas as pd
import datetime

c = Client("ucx://172.16.10.12:10010")

arr = np.random.rand(13000, 1000) #about 100Mb
df  = pd.DataFrame(arr)

n = 50
t1 = time.time()
for i in range(n):
    arr_f = c.scatter(arr, broadcast=False)
t2 = time.time()
print('{}s/array'.format((t2 - t1)/(n*1.0)))

t1 = time.time()
for i in range(n):
    df_f = c.scatter(df, broadcast=False)
t2 = time.time()
print('{}s/df'.format((t2 - t1)/(n*1.0)))

output:

[qsong@scorp2 qsong]$ python test_dask.py
[1576235321.321847] [scorp2:8060 :0]         parser.c:1568 UCX  WARN  unused env variable: UCX_CUDA_IPC_CACHE (set UCX_WARN_UNUSED_ENV_VARS=n to suppress this warning)
0.6224473333358764s/array
1.0627441358566285s/df

Could anyone give an explaination for this? DataFramize a numpy array should not obviously increase the data size.

mrocklin commented 4 years ago

Could it be a evidence that data transfering is very short at network level and just could be blocked at somewhere else ?

@quasiben What do you think about the question where most likely the time is consumed, at dask level ? or ucx-py ? or ucx. If it's on dask/distributed, could you give me some guides on diving into this?

@songqiqqq we're still actively working on UCX/Dask. I wouldn't expect perfect performance yet. If you are looking to be helpful then it would be nice if you could profile some of your work and help to identify where the slowdowns are. You might do this by using Dask asynchronously and using the cProfile module.

songqiqqq commented 4 years ago

Thanks, I would try to find the slowdowns and would report it here if any advances.

rsignell-usgs commented 1 year ago

@mrocklin or others, where do things stand today with UCX?
I was going to mention this to the USGS HPC Team (who teach a course on high-performance python that doesn't even mention Dask) but I wanted to get my facts straight before I did so.

rsignell-usgs commented 1 year ago

Okay, I ran into @mrocklin this morning at the ESIP Summer Meeting and he told me UCX usually doesn't have too much impact on jobs running on CPUs. So I guess I don't really need an update. :)

mrocklin commented 1 year ago

You can still use infiniband by setting the interface: config option.

On Tue, Jul 18, 2023 at 8:45 AM Rich Signell @.***> wrote:

Okay, I ran into @mrocklin https://github.com/mrocklin this morning and he told me UCX usually doesn't have too much impact on jobs running on CPUs. So I guess I don't really need an update. :)

— Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3315#issuecomment-1640145502, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTAIGFJIU2NEJ47E22TXQ2ANNANCNFSM4JZ2RJGQ . You are receiving this because you were mentioned.Message ID: @.***>