coiled / feedback

A place to provide Coiled feedback
14 stars 3 forks source link

Help Mitigating Slow `client.scatter` #187

Closed nathanballou closed 2 years ago

nathanballou commented 2 years ago

We recently got started with Coiled and are having some problems with scaling on a Coiled cluster. Our code runs well on a local cluster, but is rather slow on a Coiled cluster with lots of workers. I believe it is due to a client.scatter(data, broadcast=True) call before a client.map call, which means a wait for all of the workers to get the data. The data is relatively small (a few KB at most), but the calculations and scatter are within a nested for loop, so are submitted many times. So, while the actual calculations don't take very long, the waiting for data to transfer makes the overall time to compute too long.

Do you have any resources on how to help mitigate this? In the past, I have run larger instances with lots of processes to minimize network transfers, but I haven't been able to find a way to do this in Coiled.

jrbourbeau commented 2 years ago

Thanks for raising an issue @nathanballou

but the calculations and scatter are within a nested for loop, so are submitted many times

Does the scatter data change between iterations in this loop? I just want to double check whether the scatter calls can be brought outside of a loop. That is, whether we can rewrite code that looks like (below is psuedocode):

for loop in loops:
    data = client.scatter(...)
    fs = client.map(func, data)

to look like this:

data = client.scatter(...)
for loop in loops:
    fs = client.map(func, data)

to minimize the number of scatter calls.

The data is relatively small (a few KB at most)

Just another sanity check -- is the scattered data getting used several times on each worker throughout the computation? I'm wondering if scattering is actually needed if the data is small.

In the past, I have run larger instances with lots of processes to minimize network transfers, but I haven't been able to find a way to do this in Coiled.

coiled.Cluster has a worker_cpu= keyword argument you can use to specify how large you would like each worker to be. Alternatively there's also a worker_vm_types= keyword if there's a specific instance type you have in mind

nathanballou commented 2 years ago

Yes, the scatter data changes between iterations of the loop. It's possible I can refactor the code to run all of the scatter code first, and then do the calculations, but I think we would run into a similar problem in terms of latency.

The code currently looks something like this:

all_fs = []
for week in year:
     for day in week:
          data = database_read(day, week)
          data = client.scatter(data, broadcast = True)
          fs = client.map(func, range(1000), data=data)
          all_fs.append(fs)
client.gather(all_fs)

Each call of func takes about .5 seconds and the data is on the order of KB

The scattered data is being used once per calculation, but each worker does the calculation several times. In testing locally, scattering reduced the time between calculations dramatically (~1 minute between map calls to near instantaneous). I have tried it both with and without the scatter on Coiled with limited difference between the two.

Is there a way to then specify the number of processes on each worker?

fjetter commented 2 years ago

I would recommend moving the DB read to a future/delayed as well and let dask manage the data replication, i.e.

all_fs = []
for week in year:
     for day in week:
          data = client.submit(database_read, day, week)
          fs = client.map(func, range(1000), data=data)
          all_fs.append(fs)
client.gather(all_fs)

This may even be faster than using scatter since I expect the intra-cluster network to be faster than the network between client and cluster. Note, that when doing this it is entirely possible that multiple processes are accessing the database at the same time. If you need to limit this to protect the DB from overloading, there is the distributed.Semaphore

fjetter commented 2 years ago

Yet another possibility would be to only parallelize over year/week and not range(1000)

def do_stuff(day, week):
    data = database_read(day, week)
    for ix in range(1000):
        func(ix, data=data)

for week in year:
     for day in week:
        client.submit(do_stuff, day=day, week=week)

This reduces parallelism but may be a good idea since you said that func takes only 0.5 seconds. The cluster has a certain overhead per task and with 0.5s runtime the tasks are fast enough that the overhead may be noticeable (~100ms / Task; maybe a bit less)

I can also recommend looking into https://docs.dask.org/en/stable/bag.html which applies a couple of optimizations (e.g. batching) to avoid such problems. Some users also prefer this API over the low level futures.

hayesgb commented 2 years ago

Following up here @nathanballou. Have you been able to resolve your issue?

hayesgb commented 2 years ago

@nathanballou -- Closing this, assuming its resolved. If you need more, please feel free to reopen.