dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 720 forks source link

Worker State should be exclusively modified through batched comms #6604

Open crusaderky opened 2 years ago

crusaderky commented 2 years ago

This is a high level epic.

The Worker State Machine (distributed/worker_state_machine.py) can be exclusively updated through the Worker.handle_stimulus handler. Most calls that change the worker state coming from the scheduler are dealt through batched comms, which has the notable feature of being strictly sequential. This makes it a lot harder to introduce subtle race conditions where the worker state is not where the scheduler thinks it it.

There are three notable offenders that bypass the batched comms and use RPC instead:

If you use any of these calls, you may have

  1. a rebalance/replicate/scatter command is fired through RPC by the scheduler
  2. another command is fired by the scheduler through batched send, e.g. free-keys
  3. the two commands land on the worker in the opposite order as they were sent by the scheduler

e.g. the scheduler may send free-keys as it wants the worker to forget the key, and then shortly afterwards it may scatter data with the same key to the worker; but the worker will instead receive the scattered data first, which will transition the key to memory, and then free-keys, which will make it lose the scattered data.

CC @fjetter @gjoseph92

gjoseph92 commented 2 years ago

I'm generally in favor of this. Note though that async stream handlers are still processed concurrently: https://github.com/dask/distributed/blob/96c13a5bc26e67d18cee256c2a110102cef7eac6/distributed/core.py#L662-L667

So if replicate and free-keys both had async handlers (free-keys is currently sync, but just imagine), though they would always be invoked in the correct order, the handlers would still need to be written properly to be able to work concurrently. I'm not saying that's a reason not to make the change you're proposing—just something to be aware of.

The main difficultly I see is that batched comms are fire-and-forget. The scheduler neither gets a return value, or even confirmation that the operation happened. The current implementation of scatter at least seems to rely on a) getting an nbytes response back from each worker, and b) blocking until all the scatter calls have completed. So this would have to be refactored.

But overall I think this is a good idea, and ensuring state-modifying commands arrive in the order they're sent seems like an essential thing to guarantee!