dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

Tasks are not stolen if worker runs out of memory #3761

Open crusaderky opened 4 years ago

crusaderky commented 4 years ago

Executive summary

The original intent around the design of worker pause was to deal with a spill disk drive that is much slower than the tasks that produce managed memory. However, if a worker reaches the paused state either because of unmanaged memory alone or because it runs out of spill disk space, it will never get out of it. Any tasks that are queued on the worker will remain there waiting forever, causing whole computations to hang.

This issue requires the pause on the worker to trigger all queued tasks on the scheduler (as opposed to currently running) to be immediately sent back to the scheduler so that they can be computed somewhere else.


Original ticket

distributed git tip Python 3.8 Linux x64 16 cores, 32 threads CPU

I'm using a dask distributed cluster running on localhost to perform a leaf-to-root aggregation of a large hierarchical tree with ~420k nodes, each node being a dask.delayed function that partially blocks the GIL for 1-100ms. The client retrieves the very small output of the root node. My dask workers are running with 1 thread per worker.

My computation is getting randomly stuck. Some things I noticed:

I have a high degree of confidence that there are no potential race conditions in my application code.

I have spent a considerable amount of effort trying to reproduce the issue in a POC and failed. I have no idea how to debug it and I'm open to suggestions...

crusaderky commented 4 years ago

It's stuck again right now. The progress bar says 11 pending tasks out of 27186 (this is downstream of fuse). Running Client.has_what shows me 3-4 keys per worker, all with status: completed, for a total of 123 keys or 61 unique keys. Client.who_has analogously returns 27186 keys, of which only 61 are held by 1+ workers. Client.gather on all of these 61 keys successfully retrieves all values.

who_has also shows me that my graph is stuck in a specific branch:

for key, workers in sorted(client.who_has().items()):
    if workers:
        print(f"{key} ({len(workers)})")
DelayedFactory.state-(1, 0)-4759d78f-0d59-4964-b4f2-343cb72eaeda (32)
inject_portfolio_variables-5eabf03e0000000000000003 (32)
mtm.compute_portfolio-PMI/PMI2009-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2010-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2011-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2012-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/CANCELADOS RESIDUALES-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/GRUPO PMI OTRAS EMPRESAS-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI MRN-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI NASA-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI SUS-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/PETROQUIMICOS-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/DESTILADOS-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/FLETAMENTOS-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/GASOLINAS Y COMPONENTES-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/LIQUIDOS DE GAS NATURAL-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/PAPEL DUMMY-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#071c14247ad9387995b341a7c43f2ce0-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#25db11426a369665d549c9012d96860c-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#293e44096c8be45d0c921d4a86578569-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#38a812c8478de6117cbbadd4014df235-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#49f003596295ea7500f711ac551fe1e6-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#574a2d5d2f61123f0e7a1871ff66b5ee-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#5b1511321e73ef65d45527963b47956c-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#6c1d32cc1a4e0b7dc3c9dd28a1681b8d-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#8a1bb1749d0552d076c4363fe91ffc52-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#8b0f950b6767e1ae274299626baaf1b9-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#922837695226d1d4080f60da2ec11bcd-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#aa0c00e3179c9aab6a1fb441419a3353-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#bb8905bc5ef0b71c062d3953696b1dbc-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#cc2b6fc9484f83d9548051aa4de57ab0-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#d69da652c4e2186a9ea147cd321364f1-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#d85ecbc1aedad4bf806de5e8426aefcd-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#df1a8ce2151d10108264114c2290c2fc-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESID#ec3bd7b7e5a5b43c88e497c4e4fb0839-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 01 2013-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 02 2013-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 04 2013-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 05 2013-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 06 2013-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 07 2013-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 08 2013-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 09 2013-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 10 2013-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 11 2013-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 12 2013-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL MTM 2013-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/GASTOS ASOCIADOS-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/Papel 2013 Residuales-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/RECLAMOS-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/SEGUROS-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/SERVICIOS GENERICOS-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD PAN-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2014-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2015-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2016-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2017-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2018-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2019-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PMI2020-5eabf03e0000000000000003 (1)
mtm.compute_portfolio-PMI/PROYECTOS-5eabf03e0000000000000003 (1)

My keys are indicative of the hierarchy; e.g. mtm.compute_portfolio-PMI-5eabf03e0000000000000003 is the parent of mtm.compute_portfolio-PMI/PMI2013-5eabf03e0000000000000003. You can see how how the fault is under PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL, which does not appear in the above list.

Further:

fuel_prefix = "mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL"
for key, workers in sorted(client.who_has().items()):
    if key.startswith(fuel_prefix):
        print(f"{len(workers)} {key}")
0 mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL-5eabf03e0000000000000003
1 mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 01 2013-5eabf03e0000000000000003
1 mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 02 2013-5eabf03e0000000000000003
0 mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 03 2013-5eabf03e0000000000000003
1 mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 04 2013-5eabf03e0000000000000003
1 mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 05 2013-5eabf03e0000000000000003
1 mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 06 2013-5eabf03e0000000000000003
1 mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 07 2013-5eabf03e0000000000000003
1 mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 08 2013-5eabf03e0000000000000003
1 mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 09 2013-5eabf03e0000000000000003
1 mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 10 2013-5eabf03e0000000000000003
1 mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 11 2013-5eabf03e0000000000000003
1 mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 12 2013-5eabf03e0000000000000003
1 mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL MTM 2013-5eabf03e0000000000000003

which highlights that the exact issue is in mtm.compute_portfolio-PMI/PMI2013/PMI TRD/PMI TRD MEX/REFINADOS/RESIDUALES/FUEL OIL/FUEL OIL 03 2013-5eabf03e0000000000000003. Trying to retrieve its key hangs. Future.retry() does nothing. I tried rerunning the specific branch of hierarchy 100 times in parallel and went through without any glitches.

mrocklin commented 4 years ago

Who_has and has_what will always return completed tasks that are in-memory. It doesn't list processing tasks. In your situation I would probably try to ask the workers what they're working on, maybe with something like

client.run(lambda dask_worker: dask_worker.executing)

If that doesn't return then you know that the worker itself has seized up, which may tell you something. If it returns then maybe the function itself is just not making any progress. Answering this question might help to better isolate the issue.

crusaderky commented 4 years ago

Hi @mrocklin , many thanks for the help.

It looks like the scheduler thinks that a task is running while the worker has no idea about it (different run from before, so the key has changed):

>>> client.run(lambda: distributed.get_worker().executing)
{'tcp://127.0.0.1:32903': set(),
 'tcp://127.0.0.1:33477': set(),
 'tcp://127.0.0.1:33491': set(),
 'tcp://127.0.0.1:34413': set(),
 'tcp://127.0.0.1:35207': set(),
 'tcp://127.0.0.1:35939': set(),
 'tcp://127.0.0.1:35997': set(),
 'tcp://127.0.0.1:36325': set(),
 'tcp://127.0.0.1:36915': set(),
 'tcp://127.0.0.1:37413': set(),
 'tcp://127.0.0.1:37463': set(),
 'tcp://127.0.0.1:38135': set(),
 'tcp://127.0.0.1:39651': set(),
 'tcp://127.0.0.1:39805': set(),
 'tcp://127.0.0.1:40445': set(),
 'tcp://127.0.0.1:40769': set(),
 'tcp://127.0.0.1:40943': set(),
 'tcp://127.0.0.1:41027': set(),
 'tcp://127.0.0.1:41093': set(),
 'tcp://127.0.0.1:41403': set(),
 'tcp://127.0.0.1:41653': set(),
 'tcp://127.0.0.1:42103': set(),
 'tcp://127.0.0.1:42205': set(),
 'tcp://127.0.0.1:42845': set(),
 'tcp://127.0.0.1:44187': set(),
 'tcp://127.0.0.1:45311': set(),
 'tcp://127.0.0.1:45385': set(),
 'tcp://127.0.0.1:45481': set(),
 'tcp://127.0.0.1:45689': set(),
 'tcp://127.0.0.1:45847': set(),
 'tcp://127.0.0.1:45895': set(),
 'tcp://127.0.0.1:46463': set()}

>>> client.run_on_scheduler(
...     lambda dask_scheduler: {
...         uri: {str(task) for task in worker.processing}
...         for uri, worker in dask_scheduler.workers.items()
...     }
... )
{'tcp://127.0.0.1:32903': set(),
 'tcp://127.0.0.1:33477': set(),
 'tcp://127.0.0.1:33491': {"<Task 'mtm.compute_trade_item-mtm.compute_portfolio-PMI/PMI2016/PMI TRD/PMI TRADING MEXICO/GASOLINA#0c795df5c1ada165276cdfd9975141bd-5eac25b80000000000000003' processing>"},
 'tcp://127.0.0.1:34413': set(),
 'tcp://127.0.0.1:35207': set(),
 'tcp://127.0.0.1:35939': set(),
 'tcp://127.0.0.1:35997': set(),
 'tcp://127.0.0.1:36325': set(),
 'tcp://127.0.0.1:36915': set(),
 'tcp://127.0.0.1:37413': set(),
 'tcp://127.0.0.1:37463': set(),
 'tcp://127.0.0.1:38135': set(),
 'tcp://127.0.0.1:39651': set(),
 'tcp://127.0.0.1:39805': set(),
 'tcp://127.0.0.1:40445': set(),
 'tcp://127.0.0.1:40769': set(),
 'tcp://127.0.0.1:40943': set(),
 'tcp://127.0.0.1:41027': set(),
 'tcp://127.0.0.1:41093': set(),
 'tcp://127.0.0.1:41403': set(),
 'tcp://127.0.0.1:41653': set(),
 'tcp://127.0.0.1:42103': set(),
 'tcp://127.0.0.1:42205': set(),
 'tcp://127.0.0.1:42845': set(),
 'tcp://127.0.0.1:44187': set(),
 'tcp://127.0.0.1:45311': set(),
 'tcp://127.0.0.1:45385': set(),
 'tcp://127.0.0.1:45481': set(),
 'tcp://127.0.0.1:45689': set(),
 'tcp://127.0.0.1:45847': set(),
 'tcp://127.0.0.1:45895': set(),
 'tcp://127.0.0.1:46463': set()}

All my functions emit a log line at the beginning and one at the end (also in case of crash). I've identified the function that matches the key above and there's no trace of it in the logs - it never started.

mrocklin commented 4 years ago

That's very informative. The next thing I would try would be to look at dask_scheduler.story(key_name), which should give you the sequence of transitions that the task went through. Maybe something strange shows up there.

On Fri, May 1, 2020 at 8:04 AM crusaderky notifications@github.com wrote:

Hi @mrocklin https://github.com/mrocklin , many thanks for the help.

It looks like the scheduler thinks that a task is running while the worker has no idea about it (different run from before, so the key has changed):

client.run(lambda: distributed.get_worker().executing) {'tcp://127.0.0.1:32903': set(), 'tcp://127.0.0.1:33477': set(), 'tcp://127.0.0.1:33491': set(), 'tcp://127.0.0.1:34413': set(), 'tcp://127.0.0.1:35207': set(), 'tcp://127.0.0.1:35939': set(), 'tcp://127.0.0.1:35997': set(), 'tcp://127.0.0.1:36325': set(), 'tcp://127.0.0.1:36915': set(), 'tcp://127.0.0.1:37413': set(), 'tcp://127.0.0.1:37463': set(), 'tcp://127.0.0.1:38135': set(), 'tcp://127.0.0.1:39651': set(), 'tcp://127.0.0.1:39805': set(), 'tcp://127.0.0.1:40445': set(), 'tcp://127.0.0.1:40769': set(), 'tcp://127.0.0.1:40943': set(), 'tcp://127.0.0.1:41027': set(), 'tcp://127.0.0.1:41093': set(), 'tcp://127.0.0.1:41403': set(), 'tcp://127.0.0.1:41653': set(), 'tcp://127.0.0.1:42103': set(), 'tcp://127.0.0.1:42205': set(), 'tcp://127.0.0.1:42845': set(), 'tcp://127.0.0.1:44187': set(), 'tcp://127.0.0.1:45311': set(), 'tcp://127.0.0.1:45385': set(), 'tcp://127.0.0.1:45481': set(), 'tcp://127.0.0.1:45689': set(), 'tcp://127.0.0.1:45847': set(), 'tcp://127.0.0.1:45895': set(), 'tcp://127.0.0.1:46463': set()} client.run_on_scheduler(... lambda dask_scheduler: {... uri: {str(task) for task in worker.processing}... for uri, worker in dask_scheduler.workers.items()... }... ) {'tcp://127.0.0.1:32903': set(), 'tcp://127.0.0.1:33477': set(), 'tcp://127.0.0.1:33491': {"<Task 'mtm.compute_trade_item-mtm.compute_portfolio-PMI/PMI2016/PMI TRD/PMI TRADING MEXICO/GASOLINA#0c795df5c1ada165276cdfd9975141bd-5eac25b80000000000000003' processing>"}, 'tcp://127.0.0.1:34413': set(), 'tcp://127.0.0.1:35207': set(), 'tcp://127.0.0.1:35939': set(), 'tcp://127.0.0.1:35997': set(), 'tcp://127.0.0.1:36325': set(), 'tcp://127.0.0.1:36915': set(), 'tcp://127.0.0.1:37413': set(), 'tcp://127.0.0.1:37463': set(), 'tcp://127.0.0.1:38135': set(), 'tcp://127.0.0.1:39651': set(), 'tcp://127.0.0.1:39805': set(), 'tcp://127.0.0.1:40445': set(), 'tcp://127.0.0.1:40769': set(), 'tcp://127.0.0.1:40943': set(), 'tcp://127.0.0.1:41027': set(), 'tcp://127.0.0.1:41093': set(), 'tcp://127.0.0.1:41403': set(), 'tcp://127.0.0.1:41653': set(), 'tcp://127.0.0.1:42103': set(), 'tcp://127.0.0.1:42205': set(), 'tcp://127.0.0.1:42845': set(), 'tcp://127.0.0.1:44187': set(), 'tcp://127.0.0.1:45311': set(), 'tcp://127.0.0.1:45385': set(), 'tcp://127.0.0.1:45481': set(), 'tcp://127.0.0.1:45689': set(), 'tcp://127.0.0.1:45847': set(), 'tcp://127.0.0.1:45895': set(), 'tcp://127.0.0.1:46463': set()}

All my functions emit a log line at the beginning and one at the end (also in case of crash). I've identified the function that matches the key above and there's no trace of it in the logs - it never started.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3761#issuecomment-622424363, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTHMSIPOWP5Y4CJRF2DRPLQJDANCNFSM4MW7LXLQ .

crusaderky commented 4 years ago

@mrocklin 3 transitions returned:

>>> stuck_key = "mtm.compute_trade_item-mtm.compute_portfolio-PMI/PMI2016/PMI TRD/PMI TRADING MEXICO/GASOLINA#0c795df5c1ada165276cdfd9975141bd-5eac25b80000000000000003"
>>> client.run_on_scheduler(
    lambda dask_scheduler: dask_scheduler.story(stuck_key)
)
[(stuck_key, 'released', 'waiting', {}, 1588340374.1463509),
 ('DelayedFactory.state-(1, 0)-8b4c12b9-c82d-4bc8-ba84-08484227e587',
  'processing',
  'memory',
  {stuck_key: 'processing',
   (other 16k keys): 'processing',
  },
  1588340375.0087826),
 (stuck_key, 'waiting', 'processing', {}, 1588340375.4450822)
]

DelayedFactory.state is a global state node that is identically fed in input to all nodes.

mrocklin commented 4 years ago

OK, so nothing odd happened on the scheduler (like the key transitioning from processing back to released and then to memory or something).

I would do the same thing on the worker to see if anything is odd there. If that also doesn't show much then I would start looking at logs, and maybe even increase the logging verbosity to debug.

Welcome to debugging scheduling issues. In the early years of Dask these skillsets were really important (things would deadlock a lot). Nowadays very few people have to look at this stuff :)

crusaderky commented 4 years ago

@mrocklin same command on the workers.

>>> client.run(lambda dask_worker: dask_worker.story(stuck_key))
{
    "tcp://127.0.0.1:32903": [
        (stuck_key, "new"),
        (stuck_key, "waiting", "ready"),
        (stuck_key, "release-key"),
    ],
    "tcp://127.0.0.1:33477": [],
    "tcp://127.0.0.1:33491": [
        (stuck_key, "new"),
        (stuck_key, "waiting", "ready"),
        (stuck_key, "release-key"),
        (stuck_key, "new"),
        (stuck_key, "waiting", "ready"),
    ],
    "tcp://127.0.0.1:34413": [
        (stuck_key, "new"),
        (stuck_key, "waiting", "ready"),
        (stuck_key, "release-key"),
    ],
    "tcp://127.0.0.1:35207": [],
    "tcp://127.0.0.1:35939": [],
    "tcp://127.0.0.1:35997": [],
    "tcp://127.0.0.1:36325": [],
    "tcp://127.0.0.1:36915": [],
    "tcp://127.0.0.1:37413": [],
    "tcp://127.0.0.1:37463": [
        (stuck_key, "new"),
        (stuck_key, "waiting", "ready"),
        (stuck_key, "release-key"),
    ],
    "tcp://127.0.0.1:38135": [],
    "tcp://127.0.0.1:39651": [
        (stuck_key, "new"),
        (stuck_key, "waiting", "ready"),
        (stuck_key, "release-key"),
    ],
    "tcp://127.0.0.1:39805": [],
    "tcp://127.0.0.1:40445": [],
    "tcp://127.0.0.1:40769": [],
    "tcp://127.0.0.1:40943": [
        (stuck_key, "new"),
        (stuck_key, "waiting", "ready"),
        (stuck_key, "release-key"),
    ],
    "tcp://127.0.0.1:41027": [
        (stuck_key, "new"),
        (stuck_key, "waiting", "ready"),
        (stuck_key, "release-key"),
    ],
    "tcp://127.0.0.1:41093": [],
    "tcp://127.0.0.1:41403": [],
    "tcp://127.0.0.1:41653": [],
    "tcp://127.0.0.1:42103": [],
    "tcp://127.0.0.1:42205": [
        (stuck_key, "new"),
        (stuck_key, "waiting", "ready"),
        (stuck_key, "release-key"),
        (stuck_key, "new"),
        (stuck_key, "waiting", "ready"),
        (stuck_key, "release-key"),
        (stuck_key, "new"),
        (stuck_key, "waiting", "ready"),
        (stuck_key, "release-key"),
    ],
    "tcp://127.0.0.1:42845": [
        (stuck_key, "new"),
        (stuck_key, "waiting", "ready"),
        (stuck_key, "release-key"),
    ],
    "tcp://127.0.0.1:44187": [],
    "tcp://127.0.0.1:45311": [],
    "tcp://127.0.0.1:45385": [],
    "tcp://127.0.0.1:45481": [],
    "tcp://127.0.0.1:45689": [],
    "tcp://127.0.0.1:45847": [],
    "tcp://127.0.0.1:45895": [],
    "tcp://127.0.0.1:46463": [],
}
mrocklin commented 4 years ago

Interesting. It looks like this task has been moved around to a bunch of different workers. I wonder why. It looks like one of them has the task in a ready-to-compute state. I wonder what else is going on on that worker. Is it busy with something else perhaps? What does dask_worker.executing look like? (or was this empty from before). If so, I wonder if giving the worker a kick with dask_worker.ensure_computing() would make things run to completion.

crusaderky commented 4 years ago

@mrocklin, the worker is completely idle; dask_worker.executing returns an empty set. dask_worker.ensure_computing() has no visible effect. It does not cause any further transitions in dask_worker.story().

this task has been moved around to a bunch of different workers. I wonder why.

Maybe because my tasks (after fusing) take a very variable amount of time between 100ms and 10s, so the scheduler struggles to balance them up front?

mrocklin commented 4 years ago

Maybe because my tasks (after fusing) take a very variable amount of time between 100ms and 10s, so the scheduler struggles to balance them up front?

That might explain it. To be clear, this should be fine and shouldn't result in a deadlock of any sort. It just was something strange that we saw.

dask_worker.ensure_computing() has no visible effect. It does not cause any further transitions in dask_worker.story().

That's odd. The next thing I would do would be to look through the ensure_computing function and see which paths we're going down and why that task isn't transitioning to the executing state.

crusaderky commented 4 years ago

https://github.com/dask/distributed/blob/e7ba316d496a23d073188466571ce1580336aba4/distributed/worker.py#L2430-L2432

>>> client.run(lambda dask_worker: dask_worker.paused)
{'tcp://127.0.0.1:32903': False,
 'tcp://127.0.0.1:33477': False,
 'tcp://127.0.0.1:33491': True,
 'tcp://127.0.0.1:34413': False,
 'tcp://127.0.0.1:35207': False,
 'tcp://127.0.0.1:35939': False,
 'tcp://127.0.0.1:35997': False,
 'tcp://127.0.0.1:36325': False,
 'tcp://127.0.0.1:36915': False,
 'tcp://127.0.0.1:37413': False,
 'tcp://127.0.0.1:37463': False,
 'tcp://127.0.0.1:38135': False,
 'tcp://127.0.0.1:39651': False,
 'tcp://127.0.0.1:39805': False,
 'tcp://127.0.0.1:40445': False,
 'tcp://127.0.0.1:40769': False,
 'tcp://127.0.0.1:40943': False,
 'tcp://127.0.0.1:41027': False,
 'tcp://127.0.0.1:41093': False,
 'tcp://127.0.0.1:41403': False,
 'tcp://127.0.0.1:41653': False,
 'tcp://127.0.0.1:42103': False,
 'tcp://127.0.0.1:42205': False,
 'tcp://127.0.0.1:42845': False,
 'tcp://127.0.0.1:44187': False,
 'tcp://127.0.0.1:45311': False,
 'tcp://127.0.0.1:45385': False,
 'tcp://127.0.0.1:45481': False,
 'tcp://127.0.0.1:45689': False,
 'tcp://127.0.0.1:45847': False,
 'tcp://127.0.0.1:45895': False,
 'tcp://127.0.0.1:46463': False}
mrocklin commented 4 years ago

Ah, that'll do it. So the machine is close to out of memory, so it's not doing any other work. However oddly enough none of the other machines are stealing the task. I suspect that this is because stealing tries not to steal from workers that have fewer tasks than threads (presumably they should be running) but it probably isn't taking paused workers into account.

crusaderky commented 4 years ago

Yep, in the logs I found:

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 1.22 GB -- Worker memory limit: 1.50 GB

Running this twice unblocked the batch:

def unpause(dask_worker):
    dask_worker.paused = False
    dask_worker.ensure_computing()

client.run(unpause)

I was already aware of the memory leak but I don't know yet which library is responsible. However I expected that workers would just be restarted when they exceed the threshold?

mrocklin commented 4 years ago

There are a variety of thresholds. https://distributed.dask.org/en/latest/worker.html?highlight=pause#memory-management

There are, I think, two solid solutions here:

  1. We make work-stealing more aware of worker memory saturation (moderate difficulty)
  2. We make the scheduler better about actively managing memory and moving things around in the background (hard) https://github.com/dask/distributed/issues/1002

Short term, you could just turn off the pause behavior for workers

crusaderky commented 4 years ago

Besides task stealing, there's the problem that, in case of memory leak, that worker will stay paused forever instead of restarting.

I think the correct behaviour should be

if above 'pause' threashold:
    if any tasks are running:
        pause until no tasks are running anymore
    else
        terminate
mrocklin commented 4 years ago

I think that terminating paused workers is likely to cause headache in other workloads. For example often the right solution to running out of memory is to pause, and wait until other workers catch up, or more workers arrive, and then start running things again. It's not always a good solution though (as your use case clearly shows).

We could start kicking the tasks back to the scheduler in some situations. There is some logic to do this with the Reschedule exception that might be interesting to play with.

dhirschfeld commented 4 years ago

It seems to me that killing the worker is the desirable behaviour in this case. If the worker isn't running any tasks what problems can it cause - I thought dask.distributed was resilient to failed workers?

I'm curious as I've had issues with memory leaks previously and was planning on trying to recycle workers on a regular basis to avoid any such issues on a long-lived cluster.

mrocklin commented 4 years ago

It seems to me that killing the worker is the desirable behaviour in this case.

Not always, no.

If the worker isn't running any tasks what problems can it cause.

It may be holding onto data that other workers need in order to make progress on the current workflow. Killing the worker means that we need to repeat that work elsewhere. In many workloads the correct behavior is to wait for more workers to arrive.

I thought dask.distributed was resilient to failed workers?

Yes, but if you kill a worker every time it runs out of memory then many workloads will not complete.

This is one of those cases where finding the right behavior requires integrating over hundreds of different kinds of workflows unfortunately.

crusaderky commented 4 years ago

It may be holding onto data that other workers need in order to make progress on the current workflow

Isn't there any system for graceful worker termination? E.g. first all the data the worker holds is moved somewhere else, and then it is terminated?

mrocklin commented 4 years ago

There is a system for graceful worker termination (grep for graceful) but terminating the worker isn't robustly the right choice unfortunately.  As I said above, often the right thing to do is to wait with the data that you have for more workers to arrive.  This isn't always the right choice of course (as you're experiencing) but I don't think that we should kill workers that are paused because they're holding on to too much data. Instead, I think that one of the two solutions I laid out above are probably better.

crusaderky commented 4 years ago

There is a system for graceful worker termination (grep for graceful)

I found Client.retire_workers and Scheduler.retire_workers, but I could not find anything for graceful worker restart?


I just realised that I can't just set pause: False in distributed.yaml. Since the memory leak causes all workers to grow their RAM at the same pace, disabling the pause will cause a dozen or so of workers to ungracefully die all the same time, which in turn will provoke a KilledWorker exception to be returned to the client.


wait with the data that you have for more workers to arrive

How would they arrive? Is the dask adaptive system aware of paused nodes? Does it stop counting them among the available nodes, so it spins up more nodes?

Even after the extra nodes have arrived, the paused node will just sit there forever until the adaptive system decides it is time to reduce the number of workers. In a long-running, unmanned public cloud deployment, this may turn very expensive. In a private cloud, it risks running out of available hosts.

What I would think a sane algorithm could be:

When a node gets paused,

  1. Immediately, non-paused nodes should start stealing tasks from the paused node

  2. wait for tasks to finish

  3. Invoke gc.collect() and wait X seconds (configured through distributed.yaml) to see if the problem goes away by itself (e.g. a user resource that is slow to deallocate itself)

  4. If memory usage hasn't gone down, perform a graceful restart (like retire_workers, but also spawn a new process)

  5. Because of how retire_workers is designed, step 4 will actually do nothing if there are no workers available to acquire the data - namely because their memory grew at the same rate and they're all become paused. In this case

    • if adaptive and below max_workers, add an extra node
    • otherwise, ungracefully kill the worker that would cause you to lose the least amount of harm, chosen by Scheduler.workers_to_close(n=1).
crusaderky commented 4 years ago

I quickly cooked a function for graceful restart, although it involved heavy copy-pasting from Scheduler.restart so code quality is improvable. Very poorly tested.

import asyncio
from time import time
from distributed.scheduler import logger, log_errors, All, rpc

async def restart_workers(self, workers, timeout=3):
    await self.retire_workers(
        workers=workers, remove=False, close_workers=False
    )

    # Copied from Scheduler.restart
    with log_errors():
        n_workers = len(self.workers)
        nannies = {addr: self.workers[addr].nanny for addr in workers if addr in self.workers}

        for addr in list(workers):
            try:
                # Ask the worker to close if it doesn't have a nanny,
                # otherwise the nanny will kill it anyway
                self.remove_worker(address=addr, close=addr not in nannies)
            except Exception as e:
                logger.info(
                    "Exception while restarting.  This is normal", exc_info=True
                )

        self.clear_task_state()

        logger.debug("Send kill signal to nannies: %s", nannies)

        nannies = [
            rpc(nanny_address, connection_args=self.connection_args)
            for nanny_address in nannies.values()
            if nanny_address is not None
        ]

        resps = All(
            [
                nanny.restart(
                    close=True, timeout=timeout * 0.8, executor_wait=False
                )
                for nanny in nannies
            ]
        )
        try:
            resps = await asyncio.wait_for(resps, timeout)
        except TimeoutError:
            logger.error(
                "Nannies didn't report back restarted within "
                "timeout.  Continuuing with restart process"
            )
        else:
            if not all(resp == "OK" for resp in resps):
                logger.error(
                    "Not all workers responded positively: %s", resps, exc_info=True
                )
        finally:
            await asyncio.gather(*[nanny.close_rpc() for nanny in nannies])

        start = time()
        while time() < start + 10 and len(self.workers) < n_workers:
            await asyncio.sleep(0.01)

async def r(dask_scheduler):
    await restart_workers(dask_scheduler, ['tcp://127.0.0.1:44069'])

client.run_on_scheduler(r)

A daemon that gracefully restarts the workers once they become paused and no tasks are running on them (adding a grace period is a trivial exercise):

import cloudpickle

msg = dict(
    op="run",
    function=cloudpickle.dumps(
        lambda dask_worker: dask_worker.paused and not dask_worker.executing
    ),
    wait=True,
    args=cloudpickle.dumps(()),
    kwargs=cloudpickle.dumps({}),
)

async def restart_paused_workers(dask_scheduler):
    while True:
        responses = await dask_scheduler.broadcast(msg=msg, workers=None, nanny=False)
        workers = [
            addr
            for addr, resp in responses.items()
            if resp["status"] == "OK" and resp["result"].deserialize()
        ]
        if workers:
            logger.info("Restarting paused workers %s", ", ".join(workers))
            await restart_workers(dask_scheduler, workers)

        await asyncio.sleep(5)

client.run_on_scheduler(restart_paused_workers, wait=False)

Not sure if there is any way to fetch the value of Worker.paused without invoking Scheduler.broadcast?

crusaderky commented 4 years ago

@mrocklin ping

  1. could you confirm there's no function for graceful worker restart right now?
  2. code duplication aside, could you comment on the POC code above?
  3. is there any way to retrieve the value of Worker.paused from the Scheduler, other than broadcasting the 'run' command?
mrocklin commented 4 years ago

ping

My apologies. I'm very busy right now. It would be great if other maintainers could jump in here, but I wouldn't expect much. Relatedly, if you wanted to help out with maintenance by handling some issues of other people that might encourage others to spend time here? We're low on volunteer time.

Immediately, non-paused nodes should start stealing tasks from the paused node

Totally agreed. This is what I intended to recommend in the first point in https://github.com/dask/distributed/issues/3761#issuecomment-622465465

could you confirm there's no function for graceful worker restart right now?

Grepping through the worker.py file I find the close_gracefully method. I think that that is probably what I was referring to.

code duplication aside, could you comment on the POC code above?

Eventually, sure. Unfortunately I'm pretty backed up these days. Maybe this weekend?

is there any way to retrieve the value of Worker.paused from the Scheduler, other than broadcasting the 'run' command?

No, we would need to move this information around. My guess is that this would pass with the heartbeat information. The scheduler already has a sense of memory use on the workers though (also sent with the heartbeat) so we might be able to calculate this rather than send and track more state.

dhirschfeld commented 4 years ago

I'm following this issue closely because I have also experienced distributed clusters hang to the point that I now set the memory_limit=0. This is obviously a gross hack but it works for me as it lets the calculation continue and a periodic restart keeps things stable.

Anyway, the point is that if there can be better heuristics to manage memory to make distributed clusters more resilient to misbehaving code all users will benefit from that.

Whilst it does take time and effort to review user-contributions and educate them about the intricacies of the project the payoff is that hopefully, they use their new skills & knowledge to continue contributing in future - i.e. reviewing user code helps increase the number of future maintainers.

It is a bit of a chicken & egg scenario but if the maintainers don't have the time to review user contributions it's hard to see how the maintainer base could ever be scaled.

xref: https://github.com/dask/community/issues/49

mrocklin commented 4 years ago

Whilst it does take time and effort to review user-contributions and educate them about the intricacies of the project the payoff is that hopefully, they use their new skills & knowledge to continue contributing in future - i.e. reviewing user code helps increase the number of future maintainers.

Certainly. But people giving back is the far exception. You two are exceptional in that you have stuck around over time, and are comfortable actually solving the problems that you raise. I'd say you're something like 1-in-50 . For every fifty people that I help guide through something, only one of them ends up being as good of a time investment as either of you. However, to find someone who fixes not only the problems that affect them, but also reviews the work of others is very rare, I'd say something like 1-in-200. These people almost always get hired eventually.

You two are great, and I really appreciate the work that you put in, but even given that work I personally am unable to review things forever, and unfortunately it's hard to convince other people to review in this repository because it's a bit more complicated. I like your idea that "this time investment will pay off" but over the last five years the evidence is entirely to the contrary. Almost no one reviews other peoples' work in this PR, except a couple of folks whose employers I've convinced to make it part of their job. But hey, proving me wrong here is easy, you only need to go and review other peoples' work.

The challenge here is that "contributing code" is also a "tax on reviewers". So people in your position justifiably feel "hey, I'm volunteering time here to fix a bug for everyone, why aren't people appreciative and contributing back by reviewing my solution" which is a reasonable response. Unfortunately it also sets an expectation on reviewers that they work for you indefinitely. The only way I've really found to make this work is to pay people.

Anyway, I spent a few minutes taking a longer glance through the example above. I don't currently understand what it does unfortunately after this look. Given that it says that it's mostly copied from the existing restart method my guess is that this is mostly my fault.

mrocklin commented 4 years ago

Hopefully one of the other maintainers comes around to help?

mrocklin commented 4 years ago

Immediately, non-paused nodes should start stealing tasks from the paused node

My guess is that this is still the solution.

mrocklin commented 4 years ago

I'm also a bit frazzled right now because I'm trying to make a company that will get money from other companies to help solve this problem long term. If you know anyone that can help with this effort let me know. Until then I'm likely to be pretty burnt out.

crusaderky commented 4 years ago

I'd say you're something like 1-in-50 . For every fifty people that I help guide through something, only one of them ends up being as good of a time investment as either of you. [...] However, to find someone who fixes not only the problems that affect them, but also reviews the work of others is very rare, I'd say something like 1-in-200.

To be honest, sounds like a typical open source project to me. And most people using dask are literate in coding - which is more than a lot of projects can say...

This said, I am fully appreciative of your lack of capacity to follow issues and PRs.

Relatedly, if you wanted to help out with maintenance by handling some issues of other people that might encourage others to spend time here? We're low on volunteer time.

I am more than happy to contribute, but alas I don't think I can guarantee any kind of consistency in my time investment. My contributions are typically in spikes on whatever library is giving me pain on my daily work. I used to have to invest large amounts of time on xarray, then it became pint, now it's distributed - take a look at the changelogs of those two projects and you'll see the pattern.

mrocklin commented 4 years ago

This said, I am fully appreciative of your lack of capacity to follow issues and PRs.

I really appreciate your understanding here. It means a lot.

On Sat, May 9, 2020 at 3:39 PM crusaderky notifications@github.com wrote:

I'd say you're something like 1-in-50 . For every fifty people that I help guide through something, only one of them ends up being as good of a time investment as either of you. [...] However, to find someone who fixes not only the problems that affect them, but also reviews the work of others is very rare, I'd say something like 1-in-200.

To be honest, sounds like a typical open source project to me. And most people using dask are literate in coding - which is more than a lot of projects can say...

This said, I am fully appreciative of your lack of capacity to follow issues and PRs.

Relatedly, if you wanted to help out with maintenance by handling some issues of other people that might encourage others to spend time here? We're low on volunteer time.

I am more than happy to contribute, but alas I don't think I can guarantee any kind of consistency in my time investment. My contributions are typically in spikes on whatever library is giving me pain on my daily work. I used to have to invest large amounts of time on xarray, then it became pint, now it's distributed - take a look at the changelogs of those two projects and you'll see the pattern.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3761#issuecomment-626244843, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTHMSGCXRMTREZDV3WDRQXLTTANCNFSM4MW7LXLQ .

dhirschfeld commented 4 years ago

I agree, the "conversion rate" is probably very low - especially for a complex project like dask/distributed. I'm not sure what the solution is. Open source sustainability is a hard problem and I don't think anyone has solved it yet. I'm hopeful that things are improving though - and that companies like Coiled are part of the solution :+1:

crusaderky commented 4 years ago

Considering how dask is used by (1) universities and (2) large corporations, IMHO you should offer the latter a way to easily throw money at you. I think the current "donate" button is a too open-ended for the corp mindset; a typical manager will want to spend money for consulting hours to fix very specific issues - and expect that, once he does so, the capacity on the consultant's side somehow appears in short time. Not sure how you people are organized on this regard?

mrocklin commented 4 years ago

My personal solution to this is coiled.io, which started up recently. Historically Anaconda did a bit of this. Quansight is maybe also in this game.

On Mon, May 11, 2020 at 1:03 AM crusaderky notifications@github.com wrote:

Considering how dask is used by (1) universities and (2) large corporations, IMHO you should offer the latter a way to easily throw money at you. I think the current "donate" button is a too open-ended for the corp mindset; a typical manager will want to spend money for consulting hours to fix very specific issues - and expect that, once he does so, the capacity on the consultant's side somehow appears in short time. Not sure how you people are organized on this regard?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3761#issuecomment-626541162, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTHWLPCWOH7NY5BVV6LRQ6WMXANCNFSM4MW7LXLQ .

mrocklin commented 3 years ago

A PR that tries to gracefully kill a worker when it is paused without the opportunity to move forward is here: https://github.com/dask/distributed/pull/4221

crusaderky commented 2 years ago

Related issues

Impact

This ticket wants to change the cluster behaviour regarding tasks in waiting or constrained state (i.e. not executing) that are sitting on a worker in paused or closing_gracefully status.

Current state of the art

The scheduler does not assign any new tasks to a worker in paused or closing_graceully status. The Active Memory Manager (AMM) does not send any {op: acquire-replicas} events to such workers.

Waiting tasks on a closing worker sit there until all in-memory tasks have been replicated somewhere else. If the worker holds many GiBs worth of spilled data, this could potentially take several minutes. When that happens, the scheduler reschedules them somewhere else.

Waiting tasks on a paused worker sit there until the worker emerges from pause. In case of severe memory leak, this may never happen, thus causing a cluster-wide deadlock. #5999 mitigates this issue by shutting down the worker after ~30s.

High level design

When a worker enters paused or closing_gracefully status, all waiting and constrained tasks are (almost) immediately sent back to the scheduler so that they can be computed somewhere else. executing tasks remain on the worker.

Low level design

There are two ways to implement this.

The first one is through stealing. A paused or closing_gracefully worker should be treated as a worker which is expected to be busy for >30s.

I personally like another option more, as IMHO it's definitely cleaner:

When the worker enters paused or closing_gracefully state, it sends a {op: reschedule} message for all tasks in waiting and constrained state to the scheduler and transitions all tasks in waiting, constrained, and fetch state to cancelled. This last bit aborts both fetching the dependencies of the pending tasks as well as any pending AMM {op: acquire-replicas} events.

This causes the following chain of events:

  1. the scheduler sends a {op: compute-task} message to other workers for all tasks that were in running state and have now been rescheduled
  2. the other workers fetch the dependencies of the rescheduled tasks - likely from the paused/closing worker
  3. as soon as the dependencies exist on the other worker, the Active Memory Manager ReduceReplicas policy sends a remove-replicas request to the paused/closing worker.
  4. the AMM will also likely will send {op: acquire-replicas} messages to another worker for all tasks that were previously being fetched on request of the AMM (e.g. they were in fetch state on the worker but not a depenendency of another task).