Closed bnaul closed 2 years ago
One other thing, there are a bunch of these KeyErrors in the scheduler logs
2022-05-03 02:08:02,526 - distributed.stealing - ERROR - 'tcp://10.126.167.29:36945'
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/distributed/stealing.py", line 242, in move_task_request
self.scheduler.stream_comms[victim.address].send(
KeyError: 'tcp://10.126.167.29:36945'
2022-05-03 02:08:02,801 - distributed.stealing - ERROR - 'tcp://10.126.167.29:36945'
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/distributed/stealing.py", line 466, in balance
maybe_move_task(
File "/usr/local/lib/python3.9/site-packages/distributed/stealing.py", line 369, in maybe_move_task
self.move_task_request(ts, victim, thief)
File "/usr/local/lib/python3.9/site-packages/distributed/stealing.py", line 242, in move_task_request
self.scheduler.stream_comms[victim.address].send(
KeyError: 'tcp://10.126.167.29:36945'
not for the specific worker I chose above, but for some of the other 🧟s
@gjoseph92 do you have time this week to take a look at this?
Somehow the scheduler has assigned a task to run on a closed WorkerState
Thanks @bnaul, I'll take a look this week.
Hey @bnaul—sorry to say, but I won't be able to get to this this week. https://github.com/dask/distributed/pull/6329 is proving to be harder to merge than expected, and Florian is out sick, so we're running behind. This will be next up for next week though.
@bnaul to confirm, what version of distributed
is this?
I think that Brett has been on main for a little while (or main whenever this was)
On Mon, May 16, 2022 at 5:31 PM Gabe Joseph @.***> wrote:
@bnaul https://github.com/bnaul to confirm, what version of distributed is this?
— Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/6263#issuecomment-1128200904, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTH5AEMVZKM7Y2RJBSDVKLECPANCNFSM5U65NLKA . You are receiving this because you were mentioned.Message ID: @.***>
That's what I thought too, and why I wanted to know. There have been a lot of changes recently to the areas that this touch, so getting a specific commit would be helpful.
Dumping a ton of notes here from investigation. There are two major bugs here. I'll eventually open tickets for each.
This has a bit of overlap with #5480, and sort of #6341. I think that #6329 might alleviate one side, but there's a more general race condition that's probably been around for a long time.
RetireWorker
policy finishes. Scheduler calls await self.close_worker
.close_worker
calls remove_worker
. (It also redundantly enqueues a close message to the worker—remove_worker
is about to do the same—though I don't think this makes a difference here.)remove_worker
does a bunch of stuff, including deleting the BatchedSend
to that worker which has the {op: close}
message queued on it. It does not wait until the BatchedSend queue is flushed.remove_worker
removes the WorkerState
entry for that workerAt this point, the scheduler has removed all knowledge of the worker. However, the close
message hasn't even been sent to it yet—it's still queued inside a BatchedSend, which may not send the message for up to 5ms more. And even after the message has been sent, then Worker.close
still has to get scheduled on the worker event loop and run (which could take a while https://github.com/dask/distributed/issues/6325). There are multiple sources of truth for when a worker is gone. In some places, it's whether addr in Scheduler.workers
, or addr in Scheduler.stream_comms
. In others, it's whether a comm to that worker is closed. The entry being removed from the dict and the comm being closed are disjoint events.
Thus, between when Scheduler.remove_worker
ends and all comms to the worker comm actually close, we are in a degenerate state and exposed to race conditions. The scheduler forgets about the worker before closing communications to that worker. Or even confirming that the worker has received the message to close. Explicitly flushing and closing the BatchedSend
would alleviate this, though not resolve it: while waiting for our send side to close, we could still receive messages from the now-removed worker.
Simply moving the flush-and-close to the top of Scheduler.remove_worker
—before anything else happens—I think would fix the problem. Then, we wouldn't be removing state related to the worker until we were guaranteed the worker connection was closed.
remove_worker
has run, but before the close message actually gets sent over the BatchedSend / Worker.close
starts running, the worker send another heartbeat to the scheduler.closing_gracefully
, it still tries to re-register.
Worker.batched_stream.start
with a new comm object, even though Worker.batched_stream
is already running with the previous comm object. This the double-start bug I pointed out in https://github.com/dask/distributed/pull/6272/files#r866082750. This will swap out the comm
that the BatchedSend
is using, and launch a second background_send
coroutine. Surprisingly, I think having multiple background_sends racing to process one BatchedSend is still safe, just silly.BatchedSend
has now lost its reference to the original comm. However, one Worker.handle_scheduler
is still running with that original comm, and now one is running with the new comm too. Since there's still a reference to the old comm, it isn't closed.Scheduler.handle_worker
is still running. Even though, from the scheduler's perspective, the worker it's supposed to be handling doesn't exist anymore. Yay https://github.com/dask/distributed/issues/6201 — if we had a handle on this coroutine, we could have cancelled it in remove_worker
.Worker.close
actually closes (one of) its batched comms to the scheduler.remove_worker
on the scheduler, removing the entry for the "new" worker.
Scheduler.handle_worker
running for the old comm too. I presume it eventually stops when the worker actually shuts down and severs the connection? When it does, it probably won't run remove_worker
another time, because the address is already gone from Scheduler.stream_comms
.Finding yet another bug in worker reconnection might feel like another vote for https://github.com/dask/distributed/issues/6350.
The real bug here is how Scheduler.remove_worker
puts things in a broken state every time it runs (comms still exist to a worker, WorkerState doesn't). And whether we do #6350 or #6329, we need to fix this either way.
It shouldn't be that hard to fix. (It may be harder to ensure it doesn't get broken again in the future, though—the design is still brittle.) Scheduler.remove_worker
just needs to await self.stream_comms[address].close()
before doing any other state manipulation. By closing the underlying comm, this will also terminate the handle_worker
, so incoming messages won't be processed.
WorkerState
object for the old, disconnected worker.steal-request
to worker Y (where the task is currently queued), asking it to cancel the task.victim
and thief
WorkerState
s (not addresses) in WorkStealing.in_flight
WorkerState
instance—the one currently referenced WorkStealing.in_flight
—is removed from Scheduler.workers
.WorkerState
instance for it is added to Scheduler.workers
, at the same address. The scheduler thinks nothing is processing on it.move_task_confirm
handles this, and pops info about the stealing operation from WorkStealing.in_flight
.thief
WorkerState
object. This is the old WorkerState
instance, which is no longer in Scheduler.workers
.thief
's address is in scheduler.workers
, even though the theif
object isn't.Scheduler.remove_worker
goes to reschedule any tasks it's processing. But it's looking at the new WorkerState
instance, and the task was assigned to the old one, so the task is never rescheduled.From @bnaul's cluster dump:
# scheduler event log
# worker joins
- - 2022-05-02 20:06:13.639737
- action: add-worker
worker: tcp://10.126.71.26:32909
# 1.5min later, `retire_workers` removes it
- - 2022-05-02 20:07:43.847567
- action: remove-worker
processing-tasks:
... # there are some, doesn't matter what they are
worker: tcp://10.126.71.26:32909
# 10s later, the heartbeat-reconnect race condition makes it reconnect
- - 2022-05-02 20:07:52.530403
- action: add-worker
worker: tcp://10.126.71.26:32909
# 2s later
# at some point in the past (before the disconnect-reconnect),
# we had planned to steal a task from some other worker and run it on this one.
# the other worker just replied, "ok you can steal my task".
# stealing now sets the task to be processing on our worker in question.
# however, `move_task_confirm` has a reference to the old, pre-disconnect `WorkerState` instance.
# it passes checks, because the _address_ is still in Scheduler.workers.
stealing:
- - 2022-05-02 20:07:54.831012
- - confirm
- ('read-parquet-14e1bf051c27f27510e4873d5022c6a8', 10716) # <-- stuck task
- ready
- tcp://10.125.19.19:37649
- tcp://10.126.71.26:32909 # <-- our worker in question is the thief
- steal-11761
# 0.4s later
# worker shuts itself down, closing comms and triggering another `remove_worker`.
# but it appears to not be running anything, even though we know it just stole a task!
- - 2022-05-02 20:07:55.195962
- action: remove-worker
processing-tasks: {} # <-- empty, because this is showing the new WorkerState object. task above got assigned to old WorkerState object
worker: tcp://10.126.71.26:32909
# Scheduler.tasks
('read-parquet-14e1bf051c27f27510e4873d5022c6a8', 10716): # <-- stuck task
state: processing
processing_on: '<WorkerState ''tcp://10.126.71.26:32909'', name: 250, status: closed,
memory: 0, processing: 10>'
# It's processing on a WorkerState that's closed. That WorkerState has managed to collect 10 processing tasks.
# Same address as our logs above---but it's a different WorkerState instance! (And doesn't exist in `Scheduler.workers`.)
This should be a relatively simple fix in work-stealing. Either:
in_flight
, not WorkerState
instances.self.scheduler.workers.get(thief.address) is thief
.Even with that, I'm wary of having a worker disconnect and reconnect, and still trusting our decision to move a task to that worker.
But more broadly, it's interesting because it's essentially a classic use-after-free bug. You can't dereference null pointers in Python, but you can very easily work with objects that have logically been freed.
It points to how reference leaks like https://github.com/dask/distributed/issues/6250 aren't just a memory issue—they can also indicate correctness issues. If something is keeping a reference alive to an object that should be freed, there's a chance it might actually use that reference! Python, unfortunately, makes it very easy to be fast and loose with object lifetimes like this.
Thanks @gjoseph92, super interesting! And yes I believe I had just bumped to 2022.5.0 but if not it was main from probably the day of release.
Hey @bnaul, could you try things out with the latest release (2022.5.1)? The buggy codepath of https://github.com/dask/distributed/issues/6356 / https://github.com/dask/distributed/issues/6392 is still around, but with worker reconnect removed (https://github.com/dask/distributed/pull/6361), I think it's much, much less likely to be triggered.
I'd be surprised if you still see this issue anymore. Though I'd like to keep it open until https://github.com/dask/distributed/issues/6356 is actually fixed.
I'm still seeing an issue like this for the May, April, and March releases of Dask and Distributed. about ~300 tasks are processed across my 32 core machine and I see on htop that cores are firing 100%. Then core use goes down to near 0 and tasks are stuck in "processing"
I'm testing by reinstalling both dask and distributed from pip, and wasn't encountering this error back in April. is there something that would cause this error to occur across dask versions if it was run with the version with the bug?
EDIT: bringing num_workers
down to less than my number of cores rather than letting the default be set solves my issue, there are no more pauses
@rbavery generically, what you're describing sounds like a deadlock, but there's not enough information here to be sure that you're seeing the deadlock referenced in this particular issue. (There are a number of different bugs which can cause deadlocks; the overall symptom of "tasks stuck in processing" is the same.)
Could you open a new issue for this, and include any logs from the scheduler and workers you can get?
Also, could you make sure you're using the latest version (2022.5.1 or 2022.5.2)? We fixed one of the more common sources of deadlocks in the release 2 days ago. (Though if you're running a local cluster, I would not expect this one to affect you.)
Anything interesting in logs? If you're running with a local cluster you might want the flag silence_logs=False
client = Client(silence_logs=False)
Thanks for the tips @gjoseph92 and @mrocklin . I tested 2022.5.2 but still ran into this. I'll open a new issue.
@gjoseph92 IIUC we currently assume this should be closed by https://github.com/dask/distributed/issues/6356?
By whatever fixes #6356, yes
Similar at a high level to #6198 but a slightly different manifestation: dashboard shows 9 remaining tasks (one is a parent task that spawned the other 8 by calling
dd.read_parquet
), but the Info page shows only the one parent task processing.In the case of #6198 the worker showed up in the scheduler Info page (but would 404 when you tried to click through to its info); here the scheduler knows the workers are gone, but there are still tasks assigned to them anyway:
Zooming in on the first closed worker
10.126.160.29:33011
, relevant scheduler logs:And worker logs:
Also finally got a successful cluster dump 🎉
cc @gjoseph92 @fjetter @crusaderky @mrocklin