dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

Enhancement Request - Dask Workers lifetime option not waiting for job to finish #3141

Open ameetshah1983 opened 5 years ago

ameetshah1983 commented 5 years ago

When applying workers lifetime option with restart, looks like if the worker is running a job, it still moves ahead with restart.

Applied lifetime restart option for every 60 secs using 1 worker and ran a job which simply sleeps for twice the amount of time. The restart still appears to take place even if the worker is running the job.

For graceful restart, thought the worker would wait for a long running task / job to finish and when idle would then restart itself. That way even if you have along running task its not interrupted by the auto restart option.

TomAugspurger commented 5 years ago

Just to clarify, you want the --lifetime=60s to mean "60s or until I don't have any tasks, whichever is longer"? How would this be exposed in the CLI?

And does adaptive better suite your needs?

mrocklin commented 5 years ago

Lifetime was intended to provide a mechanism for a periodic reset of workers, such as might be useful when dealing with libraries that might leak memory and so benefit from cycling worker processes. It is orthogonal to adaptive.

Just to clarify, you want the --lifetime=60s to mean "60s or until I don't have any tasks, whichever is longer"? How would this be exposed in the CLI?

I think that @ameetshah1983 's request is in scope, and is probably the intent of the keyword. I think that what you say above Tom is correct, and should be the meaning of the current spelling in the CLI.

To fix this, we probably want to modify the lifetime code to close down the worker to new tasks, wait until the thread pool clears out of current tasks (or some large timeout), and then close down.

simaster123 commented 4 years ago

Not sure what the status is on this feature request, but for what it's worth, I'm using --lifetime to deal with sporadic worker stalls. So, in my case, there is a task on a worker indefinitely, and that is exactly what I'd like to restart. Perhaps, when this feature is added, would it be possible to add it with flexibility to either (1) wait for tasks to finish, or (2) restart regardless of the status of tasks.

karims commented 4 years ago

Lifetime was intended to provide a mechanism for a periodic reset of workers, such as might be useful when dealing with libraries that might leak memory and so benefit from cycling worker processes. It is orthogonal to adaptive.

Just to clarify, you want the --lifetime=60s to mean "60s or until I don't have any tasks, whichever is longer"? How would this be exposed in the CLI?

I think that @ameetshah1983 's request is in scope, and is probably the intent of the keyword. I think that what you say above Tom is correct, and should be the meaning of the current spelling in the CLI.

To fix this, we probably want to modify the lifetime code to close down the worker to new tasks, wait until the thread pool clears out of current tasks (or some large timeout), and then close down.

Any update on this feature request?

TomAugspurger commented 4 years ago

I don’t see any. Are you interested in working on it?

karims commented 4 years ago

I don’t see any. Are you interested in working on it?

Sure!

EvanKomp commented 1 year ago

What is the safest mechanism for closing the worker? I attempted to write a WorkerPlugin that closed the worker on task completion using the same mechanism as --lifetime eg. worker.io_loop.call_later(time, worker.close_gracefully but that ended up causing an unacceptable number of task duplications. I am happy to work on something and create a pull but given the tests I have run in that issue, I'm not sure that having the worker close itself after a task doesn't create a bunch of other issues.

See the issue https://github.com/dask/dask-jobqueue/issues/597

tjgalvin commented 1 year ago

Hi all - is there any update to this request? I am also curious on what the best way would be. With some guidance I might be able to implement it? @EvanKomp - did you every figure out a solution? I am wanting to use this in a SLURM setting.

electronsandstuff commented 1 year ago

Just wanted to add that this would be helpful for my use case too. I am running in a SLURM environment and trying to keep my workers within the walltime limit of the cluster without having jobs fail.

fjetter commented 1 year ago

The current mechanism is to "gracefully downscale" a worker. This typically evicts all data and runnable tasks but is not waiting for the current one to finish.

Instead of using Worker.close_gracefully here https://github.com/dask/distributed/blob/fcd921c581162f4536fd92cf2aa81da32462939c/distributed/worker.py#L842-L844

We'd need a method that is almost equal to close_gracefully but one that waits until all threads are idle. Could be something like the following.


def lifetime_close_gracefully(...):
    # Same as close_gracefully but waits for the threads to be idle
    ...
    while self.state.executing_count:
        await asyncio.sleep(0.01)
    await self.close(...)

Anybody is welcome to pick this up and create a PR (with a unit test). If you are struggling to complete, I suggest to open a Draft PR with how far you got and we can help you push this over the finishing line. Any volunteers?

tjgalvin commented 1 year ago

I think i see how it comes together. Thanks very much for the example!

I can give it a whirl over this week.

Do you think it would be better to build this into the existing close gracefully method, or separate it out completely as you suggested?

On Tue, 23 May 2023, 8:21 pm Florian Jetter, @.***> wrote:

The current mechanism is to "gracefully downscale" a worker. This typically evicts all data and runnable tasks but is not waiting for the current one to finish.

Instead of using Worker.close_gracefully here https://github.com/dask/distributed/blob/fcd921c581162f4536fd92cf2aa81da32462939c/distributed/worker.py#L842-L844

We'd need a method that is almost equal to close_gracefully but one that waits until all threads are idle. Could be something like the following.

def lifetime_close_gracefully(...):

Same as close_gracefully but waits for the threads to be idle

...
while self.state.executing_count:
    await asyncio.sleep(0.01)
await self.close(...)

Anybody is welcome to pick this up and create a PR (with a unit test). If you are struggling to complete, I suggest to open a Draft PR with how far you got and we can help you push this over the finishing line. Any volunteers?

— Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3141#issuecomment-1559192031, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACQOAJXCXWT55ZSN3GYZNJLXHSTVLANCNFSM4I75KMCQ . You are receiving this because you commented.Message ID: @.***>

AlecThomson commented 3 months ago

Hi all,

I was talking offline with @tjgalvin and he implemented a potential drain option to dask worker. I've taken his work and added a few tweaks, and I think I've got the expected behaviour working.

The changes are available in https://github.com/AlecThomson/distributed/tree/drainclose

If the maintainers are happy I can go ahead and open a PR

Here's a demo of firing up a scheduler + worker with a drain and it handling some work that will go over the lifetime:

https://github.com/dask/distributed/assets/9074527/5638d6c6-80ca-45a7-a695-bc46ef55dc50

AlecThomson commented 3 months ago

BTW - my motivation for using this feature is with dask-jobqueue in an HPC environment and using .adapt(). We have some tasks submitted to Dask that are long-running, but fit inside the walltime of a dask-jobqueue. However, depending on how busy the HPC queue is, more than one task can be submitted to a given worker. This means work starts on a second long-running task which will be subsequently killed by the lifetime and/or the HPC walltime. My thinking is that I would like to set a conservative lifetime with the drain option implemented by @tjgalvin. This will let a bunch small tasks run, but will trigger a shutdown at the end of a long-running task.

fjetter commented 3 months ago

I'm not thrilled about the drain option and would prefer teaching lifetime / retire_workers to not kill workers which are still running stuff. I haven't tested this but it may be as simple as adding this line

diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py
index 724bfc189..9c4d53760 100644
--- a/distributed/active_memory_manager.py
+++ b/distributed/active_memory_manager.py
@@ -736,4 +736,4 @@ class RetireWorker(ActiveMemoryManagerPolicy):
         ws = self.manager.scheduler.workers.get(self.address)
         if ws is None:
             return True
-        return all(len(ts.who_has or ()) > 1 for ts in ws.has_what)
+        return all(len(ts.who_has or ()) > 1 for ts in ws.has_what) and not ws.processing

I could imagine a bunch of tests tripping with this (and I'd like to see a new one testing this bahvior) but generally speaking this addition to graceful downscaling would be nice

tjgalvin commented 2 months ago

For what it was worth, I added the --drain option initially as I saw this changing what some might consider expected behavior. My thinking was that if the behavior has come to be expected (whether documented or not) perhaps a opt-in was required, at least initially.

Might be able to look at this problem again.

AlecThomson commented 2 months ago

Another quick thought on this - the `drain' state exists in batch management systems like Slurm. With the idea that the worker / job finishes its current work but stops accepting new work. This seems exactly like the kind of behaviour @ameetshah1983's request was asking for.