Open crusaderky opened 1 year ago
I'm currently -1 for increasing complexity by adding an additional worker state and would like us to consider other options first. For instance, we might just want to treat paused worker just like every other one, i.e. remove complexity again and assign tasks to paused workers as well. This behavior was introduced in https://github.com/dask/distributed/pull/5431 and was mostly based on theoretical arguments but we didn't change this behavior based on actual real world data.
I would also like us to understand better what work stealing is doing before changing anything in the scheduler.
Executive summary
A worker may end up being completely unused for the best part of a computation if it was briefly paused at some point in its early stages. Work stealing does not correct unbalances caused by a worker pausing and then later unpausing. It is unwise to immediately exclude a worker from scheduling heuristics as soon as it pauses.
Expected (naive) behaviour
When a worker reaches 85% process memory usage, it is paused. Its memory bar becomes red in the GUI. It continues computing any currently running tasks, but doesn't start any new ones that are in the worker-side queue. It is excluded from task assignment from the scheduler - which means new tasks as well as queued rootish tasks. As soon as process memory falls back below 85% (without any hysteresis cycle), it goes back to running state. It starts any tasks that were in ready state in the worker-side queue, and the scheduler starts sending it new tasks again. Work stealing takes care of rebalancing the workload.
What actually happens
If a worker is paused - even for just a fraction of a second - while a large amount of independent, but not rootish, tasks land on the scheduler, then it will be excluded from scheduling and all tasks will be sent at once to other workers. Work stealing does not rebalance anything after the worker unpauses.
Use case
test_spilling
in coiled_runtime is as follows:The workflow is divided in three stages:
In main, this use case never reaches the pause threshold. The reason is that there's modest amounts of unmanaged memory involved, so well before your process memory hits the
pause
threshold, your managed memory hits thetarget
threshold. This in turns blocks the event loop, which effectively puts a hard limit to how many tasks are in managed memory at any given time. In the video below we can see this effect, as well as the clear split between phase 1 and 2-3:https://user-images.githubusercontent.com/6213168/229532293-2dbbf5ef-70ac-4655-af55-7ff687607773.mp4
4424 however makes it a lot easier (by design) to reach the pause threshold. What the PR does is that a task that transitions from executing to memory will not cause the event loop to block until older tasks are spilled out; if the next task at the top of the ready queue does not have any spilled-out dependencies, it will start immediately while spilling/unspilling happens in a separate thread.
In the video below we can see that tasks in stage 1 very quickly reach the pause threshold. This is because
numpy.random.random
produces data faster than the disk write throughput on the Coiled workers (AWS EC2 m6i.large) can consume. The workers then start flickering in and out of paused state, as the disk catches up. This behaviour is good and by design - this is exactly how thepause
system should work, and it means that the worker will start slowing down later, after a higher degree of memory pressure.However, when phase 2 kicks in, most workers are still spilling in the background and are still paused. While this lasts for just a second or two, that's enough for the scheduler to completely exclude them when scheduling many hundreds of non-rootish tasks. When they later go back to running state, work stealing does not rebalance the queue, causing a dramatic degradation in end-to-end runtime.
https://user-images.githubusercontent.com/6213168/229539118-84365240-65fd-47ba-85cc-a75134da5706.mp4
I tried adding a brief pause on the client side between phase 1 and phase 2, waiting for all workers to unpause, and that caused the problem to disappear, achieving perfect work balance:
https://user-images.githubusercontent.com/6213168/229539869-09a49a1d-136e-40c6-bf1d-15d94f592cff.mp4
Proposed actions
First of all, we need to figure out why work stealing is not kicking in. This is clearly a bug.
Second, however, there is a consensus among developers that work stealing should be avoided if possible. To this extent, I propose to introduce a new worker state,
pausing
. A worker enterspausing
state when it passes thepause
threshold. After a quite generous amount of time - e.g. 10s - the worker transitions topaused
if it didn't fall below thepause
threshold. This resets the timer.From the worker's point of view,
pausing
is the same as paused: tasks will be left in theready
queued, and network transfers are trottled. From the scheduler's perspective,pausing
is the same asrunning
: it will not be excluded from scheduling heuristics and it will remain in therunning
set.XREFs
3761
5999
4424