dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 717 forks source link

Double counting of network transfer cost #7003

Open fjetter opened 2 years ago

fjetter commented 2 years ago

We're double counting estimated network cost in multiple places

Forst, we're calculating the estimated network cost of dependencies a worker needs to fetch in _set_duration_estimate and are setting the result to WorkerState.processing, i.e. processing = compute + comm This is also used to set the workers occupancy

When making a scheduling decision, we're typically using Scheduler.worker_objective which calculates a start_time that is defined as

https://github.com/dask/distributed/blob/b133009cee88fd48c8a345cffde0a8e9163426a6/distributed/scheduler.py#L3000-L3001

i.e.


start_time = ws.occupancy / ws.nthreads + comm_bytes / self.bandwidth
        = ws.occupancy / ws.nthreads + comm_cost

occupancy ~ sum( ... TaskPrefix.duration_average + comm_cost )
  1. comm cost should be constant and not scale with nthreads
  2. we should only account for comm_cost once

A similar double counting is introduced on work stealing side when calculating the cost_multiplier

compute_time = ws.processing[ts]  # occupancy
transfer_time = nbytes / self.scheduler.bandwidth + LATENCY
cost_multiplier = transfer_time / compute_time

# If we ignore latency for now, this yields something like

cost_multiplier ~ NBytes / (Bandwidth * duration_average + NBytes)
    =  (NBytes / Bandwidth) / (duration_average + NBytes / Bandwidth)

i.e. for network heavy tasks, this converges towards 1 which is quite the opposite of what this ratio is supposed to encode

fjetter commented 2 years ago

There is another double/multiple counting problem in _set_duration_estimate that concerns tasks with shared dependencies.

_set_duration_estimate is evaluated once per task w/out any regard of shared dependencies. Therefore, specifically for graphs where N tasks share one common node, this nodes transfer cost is vastly overestimated since it is counted N times.

fjetter commented 2 years ago

This double counting can be catastrophic for cases where transfer cost is potentially larger or of similar size than compute. Apart from an erroneous worker_objective, this can lead to misclassification of idle workers which then causes very aggressive work stealing where all tasks are stolen by the worker with the dependency. An extreme example is https://github.com/dask/distributed/issues/6573

fjetter commented 2 years ago

This double counting appears to go back to https://github.com/dask/distributed/pull/773