dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.56k stars 714 forks source link

Deadlock after few workers take all tasks #5635

Open chrisroat opened 2 years ago

chrisroat commented 2 years ago

What happened:

I have a dask-gateway cluster on k8s, and a 63k task graph. The processing has essentially stalled. Five workers have 375 tasks each, while the remaining 75 have zero. The scheduler logs have a bunch of timeout errors. The workers with all the tasks have the "unresponsive... long-running ... GIL" errors -- though it look like the running tasks are stack.

Screen Shot 2022-01-03 at 12 32 28 PM

I attempted to use dump_cluster_state, but hit a timeout error.

Anything else we need to know?:

After deleting the pods with the 5 wedged workers, the graph completed.

Environment:

Cluster Dump State: ``` --------------------------------------------------------------------------- TimeoutError Traceback (most recent call last) /opt/conda/lib/python3.8/site-packages/distributed/comm/core.py in connect() 318 # write, handshake = await asyncio.gather(comm.write(local_info), comm.read()) --> 319 handshake = await asyncio.wait_for(comm.read(), time_left()) 320 await asyncio.wait_for(comm.write(local_info), time_left()) /opt/conda/lib/python3.8/asyncio/tasks.py in wait_for() 500 await _cancel_and_wait(fut, loop=loop) --> 501 raise exceptions.TimeoutError() 502 finally: TimeoutError: The above exception was the direct cause of the following exception: OSError Traceback (most recent call last) /tmp/ipykernel_894/3630066435.py in ----> 1 client.dump_cluster_state(f"dump_{name}_{date}") /opt/conda/lib/python3.8/site-packages/distributed/client.py in dump_cluster_state(self, filename, exclude, format) 3592 3593 """ -> 3594 return self.sync( 3595 self._dump_cluster_state, 3596 filename=filename, /opt/conda/lib/python3.8/site-packages/distributed/utils.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs) 308 return future 309 else: --> 310 return sync( 311 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs 312 ) /opt/conda/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs) 362 if error[0]: 363 typ, exc, tb = error[0] --> 364 raise exc.with_traceback(tb) 365 else: 366 return result[0] /opt/conda/lib/python3.8/site-packages/distributed/utils.py in f() 347 if callback_timeout is not None: 348 future = asyncio.wait_for(future, callback_timeout) --> 349 result[0] = yield future 350 except Exception: 351 error[0] = sys.exc_info() /opt/conda/lib/python3.8/site-packages/tornado/gen.py in run(self) 760 761 try: --> 762 value = future.result() 763 except Exception: 764 exc_info = sys.exc_info() /opt/conda/lib/python3.8/site-packages/distributed/client.py in _dump_cluster_state(self, filename, exclude, format) 3495 ) 3496 versions = self._get_versions() -> 3497 scheduler_info, worker_info, versions_info = await asyncio.gather( 3498 scheduler_info, worker_info, versions 3499 ) /opt/conda/lib/python3.8/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs) 884 name, comm.name = comm.name, "ConnectionPool." + key 885 try: --> 886 result = await send_recv(comm=comm, op=key, **kwargs) 887 finally: 888 self.pool.reuse(self.addr, comm) /opt/conda/lib/python3.8/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs) 677 if comm.deserialize: 678 typ, exc, tb = clean_exception(**response) --> 679 raise exc.with_traceback(tb) 680 else: 681 raise Exception(response["exception_text"]) /opt/conda/lib/python3.8/site-packages/distributed/core.py in handle_comm() 519 result = asyncio.ensure_future(result) 520 self._ongoing_coroutines.add(result) --> 521 result = await result 522 except (CommClosedError, CancelledError): 523 if self.status in (Status.running, Status.paused): /opt/conda/lib/python3.8/site-packages/distributed/scheduler.py in broadcast() 6018 return resp 6019 -> 6020 results = await All( 6021 [send_message(address) for address in addresses if address is not None] 6022 ) /opt/conda/lib/python3.8/site-packages/distributed/utils.py in All() 206 while not tasks.done(): 207 try: --> 208 result = await tasks.next() 209 except Exception: 210 /opt/conda/lib/python3.8/site-packages/distributed/scheduler.py in send_message() 6010 6011 async def send_message(addr): -> 6012 comm = await self.rpc.connect(addr) 6013 comm.name = "Scheduler Broadcast" 6014 try: /opt/conda/lib/python3.8/site-packages/distributed/core.py in connect() 1069 except Exception as exc: 1070 self.semaphore.release() -> 1071 raise exc 1072 finally: 1073 self._connecting.discard(fut) /opt/conda/lib/python3.8/site-packages/distributed/core.py in connect() 1053 ) 1054 self._connecting.add(fut) -> 1055 comm = await fut 1056 comm.name = "ConnectionPool" 1057 comm._pool = weakref.ref(self) /opt/conda/lib/python3.8/site-packages/distributed/comm/core.py in connect() 322 with suppress(Exception): 323 await comm.close() --> 324 raise OSError( 325 f"Timed out during handshake while connecting to {addr} after {timeout} s" 326 ) from exc OSError: Timed out during handshake while connecting to tls://10.16.184.3:46165 after 30 s ```
fjetter commented 2 years ago

After deleting the pods with the 5 wedged workers, the graph completed.

I assume these are the five workers which have so much data spilled on the dashboard? (See bytes stored by worker)

"unresponsive... long-running ... GIL" errors -- though it look like the running tasks are stack.

Anything interesting happening in this function? E.g. is there native code running which may not release the GIL?

After deleting the pods with the 5 wedged workers, the graph completed.

That strongly indicates another state machine problem.

chrisroat commented 2 years ago

I think the stack task is just a da.stack call, though it's possible that optimization fused some other things. The CPU has dropped to zero.

I'm not sure how to better help diagnose the state machine issues.

I find most problems with autoscaling clusters. I could set the cluster size to very high numbers, but the workloads vary a lot and this would be wasteful. I already waste a lot because my autoscale lower bound needs to be high enough to prevent KilledWorker issues while waiting for new workers to come online... so the tail end of the graph where a few tasks are left has very low utilization.

fjetter commented 2 years ago

I attempted to use dump_cluster_state, but hit a timeout error.

These timeouts are not unexpected in such an auto scaling situation. The latest release (2022.01.0) includes a fix to the cluster dump function to not fail in case there are timeouts to individual workers.

https://github.com/dask/distributed/pull/5590

chrisroat commented 2 years ago

I just encountered a new mode, where the dashboard says 3 tasks are processing but the graph in the lower left shows 0 (and the Info page shows no workers with anything in "processing"). I typically have to delete a pod to unstick things, but this time I don't know which pod!

Screen Shot 2022-01-20 at 5 07 06 AM

I searched through the logs for "overlap-getitem", since that is the most upstream task prefix that is wedged. In the scheduler, I see 12 lines in succession (always the same key, but several different workers) in the scheduler:

Unexpected worker completed task. Expected: <WorkerState 'tls://10.18.96.2:43711', name: dask-worker-5ed6bca7ff4e4dfdb88a823c2b5e540b-jcxzh, status: running, memory: 809, processing: 205>, Got: <WorkerState 'tls://10.16.244.2:36389', name: dask-worker-5ed6bca7ff4e4dfdb88a823c2b5e540b-trg9s, status: running, memory: 139, processing: 28>, Key: ('overlap-getitem-666998704fa0ead8d1aecce3291ebb76', 1, 2, 18)

I tried deleting the workers mentioned, some of which are still around, but that did not bring back processing of the missing keys.

About 25 seconds before those log entries, I see:

distributed.core - INFO - Event loop was unresponsive in Scheduler for 31.66s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability

Some 50 minutes later:

File "/opt/conda/lib/python3.8/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
  File "/opt/conda/lib/python3.8/site-packages/distributed/stealing.py", line 477, in balance
    maybe_move_task(
  File "/opt/conda/lib/python3.8/site-packages/distributed/stealing.py", line 378, in maybe_move_task
    self.move_task_request(ts, sat, idl)
  File "/opt/conda/lib/python3.8/site-packages/distributed/stealing.py", line 258, in move_task_request
    self.scheduler.stream_comms[victim.address].send(
KeyError: 'tls://10.18.96.2:43711'

That key is seen in the logs as a worker being retired just a few seconds prior to the "Unexpected worker completed task" messages. The same key is re-registered in the middle of those "Unexpected worker completed task" messages.

The cluster dump would not upload to github, so I put it here:

https://drive.google.com/file/d/1okQ6M8M8smP0oBhvlrdbQLJJbWp7_32U/view?usp=sharing

bnaul commented 2 years ago

We have recently started seeing this behavior as well, currently on dask+distributed 2022.1.0 but I think it's been happening for a couple of releases. Same pattern as @chrisroat:

@crusaderky @fjetter any chance #5665 could be related given that this worker was left for dead but then kept going...?

crusaderky commented 2 years ago

@bnaul #5665 mitigates, but does not fix, the issue. Everything in the opening post, particularly the memory usage graph of the stuck workers, seems to indicate that this may be a duplicate of https://github.com/dask/distributed/issues/3761:

I can see from your dashboard that 4 workers are paused with extremely high amounts of spilled memory. They never get out of paused state and they hold a bunch of queued tasks, which are not returned to the scheduler. The reason why they don't get out of paused state may be (educated guess) that you ran out of disk space and they can't swap out anymore. As of the latest release, the error handling for this use case is fragile - see https://github.com/dask/distributed/issues/5364 for improvement.

I currently have two changes in my pipeline:

  1. as soon as a worker pauses, it should return its queued tasks to the scheduler
  2. if a worker stays paused for too long, it should restart automatically

While you wait for those, I would suggest you try to figure out why you have 180 GiB worth of managed data per worker on 4 workers while the rest of the cluster is relatively free.

Do those workers contain replicas of data that already exists elsewhere? In that case, the new Active Memory Manager could help you: https://coiled.io/blog/introducing-the-dask-active-memory-manager/

You can also try running rebalance() periodically. Word of warning - the method is not safe to run on a busy scheduler. Making it safe and automatic is also in my pipeline.

chrisroat commented 2 years ago

I see this consolidation of tasks a few workers when I use auto-scaling and spot instances. I think the addition/deletion of workers is throwing a curveball to the scheduler. I tried to repro in #4471 (no deadlock -- just bad scheduling), where you see a small number of workers get all the tasks as the cluster scales up.

The graph above is a map_overlap on a 3d-chunked array with 21720=680 chunks, reading from and writing to the zarr format. This leads to 82k tasks(!). I notice in an issue you link that one can adjust fuse parameters. Is that worth trying -- could it reduce the number of tasks and perhaps lower the rate of wonky scheduling & deadlocks?

FWIW, I did attempt the AMM at one point. If I set the env var DASK_DISTRIBUTEDSCHEDULERACTIVE_MEMORY-MANAGER__START to "true", it turns on -- do I need additional settings?

crusaderky commented 2 years ago

I see this consolidation of tasks a few workers when I use auto-scaling and spot instances. I think the addition/deletion of workers is throwing a curveball to the scheduler.

Do you mean that the 4 workers that I see saturated in the dashboard are the initial ones and the rest of the cluster only goes online when they are already very full?

The graph above is a map_overlap on a 3d-chunked array with 2_17_20=680 chunks, reading from and writing to the zarr format. This leads to 82k tasks(!). I notice in an issue you link that one can adjust fuse parameters. Is that worth trying -- could it reduce the number of tasks and perhaps lower the rate of wonky scheduling & deadlocks?

fuse parameters can reduce the number of tasks, which in turn translates in a reduction in CPU load on the scheduler. If your scheduler CPU is frequently near 100%, this should in turn translate in a reduction in random timeouts and disconnects. I don't think this is your case - unless you're frequently seeing them in the logs.

FWIW, I did attempt the AMM at one point. If I set the env var DASK_DISTRIBUTEDSCHEDULERACTIVE_MEMORY-MANAGER__START to "true", it turns on -- do I need additional settings?

Should be enough. Please run client.amm.running() to double check.

Could you run client.rebalance() every once in a while? I wouldn't recommend it as a production fix (it's not robust in the main branch while computations are running), but if it does help you it will give me a case for raising the priority of the rework I'm doing on it.

chrisroat commented 2 years ago

Do you mean that the 4 workers that I see saturated in the dashboard are the initial ones and the rest of the cluster only goes online when they are already very full?

I don't think so. In the issue I references (#4471), I scale from 1 to 4 to 20 workers. It looks like 6 workers get the bulk of the load, and a few others get some load.

fuse parameters can reduce the number of tasks, which in turn translates in a reduction in CPU load on the scheduler. If your scheduler CPU is frequently near 100%, this should in turn translate in a reduction in random timeouts and disconnects. I don't think this is your case - unless you're frequently seeing them in the logs.

I do see a lot of comm errors in the logs.

FWIW, I did attempt the AMM at one point. If I set the env var DASK_DISTRIBUTEDSCHEDULERACTIVE_MEMORY-MANAGER__START to "true", it turns on -- do I need additional settings?

Should be enough. Please run client.amm.running() to double check.

Could you run client.rebalance() every once in a while? I wouldn't recommend it as a production fix (it's not robust in the main branch while computations are running), but if it does help you it will give me a case for raising the priority of the rework I'm doing on it.

I could try that on some future run. I do see tons of GIL errors, which I guess must be the read/write to disk that is happening. It does seem like distributing the work better would mitigate things.

fjetter commented 2 years ago

FYI I recently merged a PR which might help in this situation. At least the failure scenario I fixed there is likely to happen in scaling situations, particularly if workers are removed https://github.com/dask/distributed/pull/5786 Note that we got already another deadlock reported (off github) after this fix so we're not over the hill, yet.

mrocklin commented 2 years ago

The "few workers taking all the tasks" thing is explainable and fixable by https://github.com/dask/distributed/pull/6115 .

This doesn't explain things fully halting though, unless possibly you've run out of disk space (is this possible?)