dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Data lingers in memory due to imbalance of worker priorities #1747

Open mrocklin opened 6 years ago

mrocklin commented 6 years ago

We experience some excess memory use because different workers are processing tasks of different priorities.

When we create task graphs we run dask.order on them, which provides a good ordering in order to minimize memory use. When this graph goes out to the workers it gets cut up, and tasks that are very close to each other in the ordering may end up on different workers. Those workers may then get distracted by different things, which means that while some tasks early in the ordering are complete, their co-dependents may not be complete, and are instead trapped on another worker not running, despite their high priority.

We might resolve this in a few ways:

  1. Preferentially steal tasks by their priority. This is possibly expensive (sorting is hard) but might be worth it for tasks without dependencies, or in cases where the number of tasks is not high
  2. Revert back to scheduling tasks only as needed. Currently we schedule all runnable tasks immediately. This helps ensure saturation of hardware. We could rethink this move.
  3. Don't do anything, and rely on mechanisms to slow down workers when they get too much data in memory, allowing their peers to catch up.
mrocklin commented 6 years ago

This possibly a partial cause to cause https://github.com/pangeo-data/pangeo/issues/99

rbubley commented 6 years ago

If you go for (1), I don’t think you need a full (expensive) sort: you only need the top few, which can be retrieve with a single scan. I.e. O(n) not O(n log n)

caseyjlaw commented 6 years ago

FWIW, I am seeing this lingering memory issue in my use case. I use the submit method and chain together a series of futures in graphs than open and close like this:

           |-> process0 ->|
read0----->|-> process1 ->| -> merge0
           |-> process2 ->|

This is repeated for tens of reads/merges and the process step produces a hundred times as many function calls. Nothing too demanding. I'd like the scheduler to push through the process step in order to free up the read memory. In practice, when I submit many of these graphs, all the read functions get scheduled first and the memory use blows up.

mrocklin commented 6 years ago

I suspect that you have a different issue, especially if you are using client.submit. I recommend raising another issue.

On Thu, Feb 8, 2018 at 10:01 AM, Casey Law notifications@github.com wrote:

FWIW, I am seeing this lingering memory issue in my use case. I use the submit method and chain together a series of futures in graphs than open and close like this:

       |-> process0 ->|

read0----->|-> process1 ->| -> merge0 |-> process2 ->|

This is repeated for tens of reads/merges and the process step produces a hundred times as many function calls. Nothing too demanding. I'd like the scheduler to push through the process step in order to free up the read memory. In practice, when I submit many of these graphs, all the read functions get scheduled first and the memory use blows up.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/1747#issuecomment-364138325, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszB5Ll0MLO3lDTCRMpWdp51TCx3Taks5tSwxYgaJpZM4R83_u .

sjperkins commented 6 years ago

When this graph goes out to the workers it gets cut up, and tasks that are very close to each other in the ordering may end up on different workers. Those workers may then get distracted by different things, which means that while some tasks early in the ordering are complete, their co-dependents may not be complete, and are instead trapped on another worker not running, despite their high priority.

I'd like to re-raise the idea of grouping tasks into partitions that are each assigned to a worker (assignment occurs when first task in the partition starts to execute, as suggested in https://github.com/dask/distributed/pull/1559).

Then, would it not be possible to linearly subdivide the ordering priority space into bins and assigns tasks to each bin? Something like:

task_bins = np.linspace(order_low, order_high, nworkers)
task_order = [t.order for t in tasks]
task_worker = np.digitize(task_order, task_bins)

for task, worker in zip(tasks, task_worker):
   submit(task, worker=worker)

This is probably highly naive when considering actual scheduler resource constraints, but the basic idea might be useful/adaptable when trying to minimise I/O costs.