The backward step requires communicating large dense arrays from the worker to the runner node. The following simple reproducer is meant to illustrate that the communication from the workers to the runner takes a significant amount of time
import sys
import numpy as np
from time import time
import dask
dask.config.set({
'distributed.comm.compression': {
'on': False,
'type': 'lz4'
}
})
class band_actor(object):
def __init__(self, nx, ny, nbasis, bid):
self.nx = nx
self.ny = ny
self.nbasis = nbasis
self.bid = bid
def do_something(self, x):
ti = time()
y = np.random.randn(self.nbasis, self.nx, self.ny)
print(f'band {self.bid} = ', time() - ti)
return y
def main(nband, nbasis, nx, ny):
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=True,
n_workers=nband,
threads_per_worker=1,
memory_limit=0, # str(mem_limit/nworkers)+'GB'
asynchronous=False)
client = Client(cluster, direct_to_workers=True)
client.wait_for_workers(nband)
_main(nband, nbasis, nx, ny)
def _main(nband, nbasis, nx, ny):
from distributed import get_client, as_completed
client = get_client()
names = list(client.scheduler_info()['workers'].keys())
futures = []
for wname, bandid in zip(names, range(nband)):
f = client.submit(band_actor,
nx,
ny,
nbasis,
bandid,
workers=wname,
actor=True,
pure=False)
futures.append(f)
actors = list(map(lambda f: f.result(), futures))
niter = 2
for i in range(niter):
ti = time()
ratio = np.random.randn(nbasis, nx, ny)
print('ratio = ', time() - ti)
ti = time()
futures = list(map(lambda a: a.do_something(ratio), actors))
print('submit = ', time() - ti)
ti = time()
results = []
for fut in as_completed(futures):
results.append(fut.result())
print('result = ', time() - ti)
if __name__=='__main__':
nband = int(sys.argv[1])
nbasis = int(sys.argv[2])
nx = int(sys.argv[3])
ny = int(sys.argv[4])
print(nband, nbasis, nx, ny)
main(nband, nbasis, nx, ny)
Sample output from a representative size problem is the following
$ python actor_transfers.py 10 4 8192 8192
10 4 8192 8192
ratio = 6.603142023086548
submit = 0.0007979869842529297
band 0 = 6.500479459762573
band 1 = 6.201739549636841
band 6 = 6.064023017883301
band 9 = 6.040329694747925
band 5 = 6.069493055343628
band 4 = 5.986522674560547
band 2 = 6.046694040298462
band 3 = 6.001271724700928
band 7 = 6.132995128631592
band 8 = 6.157159090042114
result = 19.300325870513916
ratio = 6.187124967575073
submit = 0.0002925395965576172
band 0 = 6.295117139816284
band 1 = 6.209485292434692
band 2 = 6.173954486846924
band 6 = 6.105499267578125
band 4 = 6.307347774505615
band 8 = 6.160896301269531
band 3 = 6.261183261871338
band 5 = 6.0839598178863525
band 9 = 6.098883867263794
band 7 = 6.115009784698486
result = 20.101259231567383
Seems to me that the transfers take approximately 12-13s for arrays of this size. In the realistic case where each worker has to do a number of FFTs and wavelet transforms (on arrays of the same size) this is about 40% of the total time spend per iteration. Looking at htop I see one single (red) thread during the transfer. It might be possible to use shared memory to avoid the transfer on a single node but I'm not sure if it is possible to get around this in the distributed case. Switching the compression on or off doesn't seem to make a difference. If the client doesn't connect directly to workers the total time goes from 20s to about 40s per iteration which I guess makes a degree of sense. Pinging @sjperkins @JSKenyon @o-smirnov for inspiration
The backward step requires communicating large dense arrays from the worker to the runner node. The following simple reproducer is meant to illustrate that the communication from the workers to the runner takes a significant amount of time
Sample output from a representative size problem is the following
Seems to me that the transfers take approximately 12-13s for arrays of this size. In the realistic case where each worker has to do a number of FFTs and wavelet transforms (on arrays of the same size) this is about 40% of the total time spend per iteration. Looking at htop I see one single (red) thread during the transfer. It might be possible to use shared memory to avoid the transfer on a single node but I'm not sure if it is possible to get around this in the distributed case. Switching the compression on or off doesn't seem to make a difference. If the client doesn't connect directly to workers the total time goes from 20s to about 40s per iteration which I guess makes a degree of sense. Pinging @sjperkins @JSKenyon @o-smirnov for inspiration