dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.56k stars 715 forks source link

Are timeouts in Worker.close antipatterns? #7318

Open fjetter opened 1 year ago

fjetter commented 1 year ago

I just reviewed our code around closing workers and noticed many, many timeouts of which I doubt they actually are very useful. I would even go as far as call them code smells or anti patterns and believe that if they ever trigger we're in a potentially inconsistent state.

BaseWorker timeout

https://github.com/dask/distributed/blob/5635bc01965db48b06800764e11d0069be38e670/distributed/worker.py#L1529

BaseWorker is probably not the ideal class name for this. BaseWorker is a class that handles the async instructions emitted from the WorkerState. For instance, this handles signals like Execute or GatherDep. It implements a timeout for the specific case (that is also tested) when instruction handlers are catching CancelledErrors and might block indefinitely. Two reason why I believe this is not necessary

  1. We control all instruction handlers. If we did something like that, we should fix it and not raise a timeout error. This is clearly a bug
  2. Even if we did this, shouldn't we cancel a task repeatedly until it is in fact closed?

BatchedSend close timeout

https://github.com/dask/distributed/blob/5635bc01965db48b06800764e11d0069be38e670/distributed/worker.py#L1595 which goes to https://github.com/dask/distributed/blob/5635bc01965db48b06800764e11d0069be38e670/distributed/batched.py#L172

This timeout controls the BatchedSend background task. If the background task doesn't finish in the provided time, this will raise. However, for the background task to actually not finish we'd need a comm.write to basically block for at least timeout seconds without raising an exception after the remove aborted the comm. I'm not sure if this is realistic, at least not with TCP

Threadpool timeout

https://github.com/dask/distributed/blob/5635bc01965db48b06800764e11d0069be38e670/distributed/worker.py#L1604

Last but not least, there is a threadpool timeout which is thrown into Thread.join in our threadpool, see https://github.com/dask/distributed/blob/5635bc01965db48b06800764e11d0069be38e670/distributed/threadpoolexecutor.py#L107

This timeout does not exist in the stdlib Threadpool. Joining a thread with a timeout is actually not raising but will just block for at most that many seconds. This timeout more or less makes sense because we do not want to block the shutdown of a worker if it is still running user tasks. However, if this timeout is hit, we'd basically just leak the thread and we rely that they are daemon threads (which is not the case for the stdlib pool) such that the python process terminates and takes the threads with them.

A couple of questions here

  1. Why do we wait at all? If the worker is closing there is no point in waiting for the user tasks to finish, is there?
  2. If we want to wait, why wait only for a bit? Is this one of these situations where we decide to wait a bit because in most cases we'll have a clean state?

My gut tells me right now that we can throw away all timeouts and the executor_wait kwarg and reduce complexity significantly.

cc @gjoseph92 @crusaderky @graingert

fjetter commented 1 year ago

Note: The simplifications go far beyond the method itself.

  1. Interfaces between servers (nanny, worker, scheduler) are inconsistent making it hard to write a standardized method in the superclass
  2. the Nanny is using IPC to control the kwargs passed to Worker.close. Much simpler if we're not providing any

Last but not least, I think there are legitimate reasons to timeout a close. However, I believe these timeouts should be applied from the outside because there are so many things that could actually block that we shouldn't even attempt to handle them gracefully. E.g. teardown of a user plugin could block and is beyond our control and not regulated with our timeout argument

crusaderky commented 1 year ago
  1. We control all instruction handlers. If we did something like that, we should fix it and not raise a timeout error. This is clearly a bug

The timeout in BaseWorker.close is specific to the use case where a user has an async database client running in a task and catches CancelledError to perform a clean shutdown. I'm happy to stop supporting it and just say that if you do catch CancelledError and you don't implement your own internal timeout, it's your fault if the worker gets stuck.