dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 717 forks source link

Restart paused workers after a certain timeout #5999

Open crusaderky opened 2 years ago

crusaderky commented 2 years ago

Use case

Workers pause once they hit distributed.worker.memory.pause threshold (default 80%). This is to allow a worker to spill managed data to disk to free up memory to consume computation; in other words when the tasks are producing managed data faster than they can be spilled to the backing disk.

However, if memory is full due to unmanaged memory, the worker will never be unpaused and remain a deadweight forever. In real life, this typically happens when a library leaks memory.

Design

If a worker remains paused beyond a certain user-defined timeout, restart it through graceful worker retirement. This implies moving all of its managed data out of the spill file and to other workers.

[EDIT] Ideally, this timeout should start ticking either after all managed data has been spilled to disk or data failed to spill (due to disk full or max_spill).

AC

Caveats

Rejected ideas

Persistent spill buffer which is retrieved after restart. This would prevent the above caveat.

Related

crusaderky commented 2 years ago

Implementing this will have unpleasant consequences when the failure to get out of paused state is not due to memory leaks, but due to running out of disk space.

e.g. the cluster has 3 workers A, B, and C memory_limit=10GB max_spill (or disk space)=10GB

  1. Worker A has 8GB managed in memory + 10 GB spilled -> it pauses
  2. It tries to spill but fails because the spill disk is full -> it remains paused

Today, the cluster is deadlocked. After this ticket:

  1. After 30 seconds that A won't emerge out of pause, it is retired. This initiates a transfer of up to 18GB of data to the rest of the cluster ."Up to" because data that is already replicated elsewhere won't be touched. For simplicity's sake, let's say that workers hold exclusively unique data.
  2. Workers B and C have each 6 GB managed in memory + 4 GB spilled, so they are healthy and running.
  3. Each of them is expected to receive 9GB worth of data from A ( (10 + 8) / 2) ), so that A can be restarted. 6 + 4 + 9 = 19 GB, but memory_limit + max_spill = 18 GB.
  4. This causes them to reach 8 GB managed in memory + 10 GB spilled, which makes them pause and sends them in the same situation as A.
  5. There are still 2 GB of memory on A. As soon as there are no more running (not paused, not retiring) workers on the cluster, the retirement of A fails; A is sent back to running state because it now has a very light load of 2 GB.
  6. However, after 30s, the retirement of B and C begins. Towards A, which is now going to receive the 16 GB it previously sent to B and C, plus the 20 GB that they had on their own.
  7. This quickly sends A permanently into paused state, which makes B and C fail to retire as there are no other running workers.
  8. B and C have again 10 GB total each, which lets them go back into running state.
  9. Go back to step 1. An infinite ping-pong loop ensues.

A timeout to retirement is being discussed in https://github.com/dask/distributed/issues/6252. It would not help in this use case, as workers would be in and out of retiring state with some periodicity.

crusaderky commented 2 years ago

This ticket heavily interacts with:

fjetter commented 2 years ago

I know this issue has been open for a while now, apologies for waiting so long to reply. I'm a bit hesitant to pull the trigger on this feature. IIUC this is relevant if there is a significant amount of unmanaged memory on an otherwise idle worker (if a computation is running we cannot assume that the memory is actually leaked, it may be just used by the user function).

I'm aware of two sources of significant unmanaged memory

  1. A genuine memory leak of a third part library
  2. Memory fragmentation

If either of these sources would flood available RSS, the worker would need to be restarted to free up available memory, ideally gracefully. I'm working under the assumption that 2.) is fixed by https://github.com/dask/distributed/issues/6780 which leaves 1.) which I'm not entirely convinced justifies implementing this feature.

For all other cases where unmanaged memory is not the cause for RSS to be full but managed memory + disk, I don't think restarting is the best approach. I believe this is what you are outlining in https://github.com/dask/distributed/issues/5999#issuecomment-1177457188 but I have to admit that I didn't fully understand the context of this scenario.

I acknowledge that if all workers are full of memory and paused, regardless of what the root cause of the memory is, the cluster would deadlock. However, this would be a situation we could deal with by for instance issuing a warning on the client or even failing the graph, etc. (the scheduler knows if all workers are paused)