dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

Cost per task grows with number of tasks #2611

Open elliottslaughter opened 5 years ago

elliottslaughter commented 5 years ago

I noticed recently that the running time of my application grows faster than O(N) in the number of steps. This is an application written to issue tasks with dask.delayed. To quantify this I made a stripped down version that has the same dependence pattern with empty tasks, so all the time should be spend in the construction of the task graph.

I'm running a distributed setup with 1 scheduler and 1 worker on a machine with 32 cores per node. The worker has --nthreads 32 --nprocs 1 but I don't think this matters since all the time is spent in graph construction. Running htop on the node shows the client is using 100% of one CPU core and the rest of the system is idle (and well below its memory limit).

My results are below. Time per task seems to grow with O(N) in the number of tasks. This means that total running time grows O(N^2).

I realize that as a workaround I can probably call compute() more frequently---based on the data below it looks like about every 10 steps would be optimal. But I still want to check if this is a known performance behavior of Dask and whether it's worth looking at the application more closely to see if I might be doing something wrong.

Thanks in advance and let me know if this would be better submitted under another Dask subproject.

Steps Tasks Total Time (s) Time per Task (ms)
1 32 0.5316095 16.61279688
10 320 0.6279482 1.962338125
50 1600 7.029356 4.3933475
100 3200 25.81444 8.0670125
200 6400 107.7474 16.83553125
300 9600 236.0527 24.58882292
$ conda list
dask                      1.1.5                      py_0  
dask-core                 1.1.5                      py_0  
distributed               1.26.1                   py37_0  
python                    3.7.1                h0371630_7  
tornado                   6.0.2            py37h7b6447c_0  
mrocklin commented 5 years ago

Thanks @elliottslaughter , that's surprising. We've taken some care so that computation time per task is constant relative to the number of tasks (or at least I had thought so). Are you able to share your benchmarking script?

elliottslaughter commented 5 years ago

Here you go:

https://gist.github.com/elliottslaughter/0d334181b1d52af5572205015cd9ade9

I've stripped this down so we can be 100% there's absolutely nothing from the original application aside from the dependence pattern.

$ python micro_bench.py -h
usage: micro_bench.py [-h] --steps STEPS --width WIDTH [--scheduler SCHEDULER]

optional arguments:
  -h, --help            show this help message and exit
  --steps STEPS
  --width WIDTH
  --scheduler SCHEDULER

The issue actually reproduces with or without a distributed setup. Here's an example running on my local desktop (not distributed):

$ python micro_bench.py --steps 100 --width 20
Total Time: 1.343633 s
Time per Task: 0.671816 ms
$ python micro_bench.py --steps 200 --width 20
Total Time: 5.102424 s
Time per Task: 1.275606 ms
mrocklin commented 5 years ago

It looks like most of the time is in graph construction using the dask.delayed API.

(my apologies for the lack of the minimal example, create_graph is the same as your function, but it returns the unevaluated join delayed object)

In [28]: %time v = create_graph(100, 20)
CPU times: user 854 ms, sys: 12.2 ms, total: 866 ms
Wall time: 860 ms

In [29]: %time v.compute(scheduler='single-threaded')
CPU times: user 70.8 ms, sys: 871 µs, total: 71.6 ms
Wall time: 71 ms

In [30]: %time v = create_graph(200, 20)
CPU times: user 3.51 s, sys: 83.5 ms, total: 3.59 s
Wall time: 3.59 s

In [31]: %time v.compute(scheduler='single-threaded')
CPU times: user 136 ms, sys: 1.65 ms, total: 137 ms
Wall time: 137 ms

So probably something is up with graph construction and dask.delayed.

A tiny bit of profiling points to about half the cost happening in an isinstance check in the HighLevelGraph constructor. Fixed in https://github.com/dask/dask/pull/4699 but things are still expensive and construction time with dask.delayed is still nonlinear.

HighLevelGraphs were added recently to provide better structural sharing in the other collections (array, bag, dataframe) but it looks like they had a negative effect on dask.delayed.

mrocklin commented 5 years ago

@elliottslaughter if you're looking to just check the performance of the scheduler itself, then you might consider constructing graphs directly or using the real-time futures interface, which wouldn't suffer from this problem.

Alternatively, it could be that one of the maintainers has time to look more deeply into the dask.delayed issue. This would be a good introduction to HighLevelGraphs (and possibly their removal from dask.delayed if we cant find a way to make them cheap in this case). cc @jcrist and @jrbourbeau who both seem like they might find this interesting.

elliottslaughter commented 5 years ago

Thanks for the suggestion. Is it possible to implement the equivalent of @dask.delayed(nout=2) using either of the two methods you mentioned?

elliottslaughter commented 5 years ago

It just occurred to me that I can do this manually with a task that selects the appropriate index of the returned tuple:

def splitter(value, idx):
    return value[idx]
elliottslaughter commented 5 years ago

I can confirm that with direct graph construction the time per task is constant. This is sufficient for my immediate needs. I'll let the team decide whether to keep this issue open or not.

impravin22 commented 3 years ago

@elliottslaughter @mrocklin Is there any way to estimate the total time that would take to finish all the tasks in a distributed dask cluster (like progress bar). If yes, please do let me know your suggestions. Thank you in advance.

mrocklin commented 3 years ago

This appears to be a separate topic. In the future I recommend raising a new issue, or asking a question on Stack overflow.

However, I believe that the value you're looking for is Scheduler.total_occupancy, as well as possibly Scheduler.total_nthreads . You may want to get these values with the Client.run_on_scheduler method.

On Sun, Nov 15, 2020 at 11:34 PM Praveen notifications@github.com wrote:

@elliottslaughter https://github.com/elliottslaughter @mrocklin https://github.com/mrocklin Is there any way to estimate the total time that would take to finish all the tasks in a distributed dask cluster (like progress bar). If yes, please do let me know your suggestions. Thank you in advance.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2611#issuecomment-727793943, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTGBU5RWNRRTG5PXGZLSQDIX7ANCNFSM4HFW3A5A .

impravin22 commented 3 years ago

@mrocklin Thanks for your prompt response. I tried using Client.run_on_scheduler method. Unfortunately, it does not help me. I am trying to estimate the time taken by each grid in gridsearchcv in a distributed dask cluster. Is there any way to do this? Thank you very much.