dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 715 forks source link

Retiring worker by name in SpecCluster #4532

Open nsmith- opened 3 years ago

nsmith- commented 3 years ago

When a cluster retires a worker, it assumes the keys of self.workers are addresses in the RPC call: https://github.com/dask/distributed/blob/383ea0326ae103b5d5e0b62ed9c3cb18510c5b9e/distributed/deploy/spec.py#L333 But by default the workers are constructed with names per https://github.com/dask/distributed/blob/383ea0326ae103b5d5e0b62ed9c3cb18510c5b9e/distributed/deploy/spec.py#L498 (integers even, by default) It seems to me that L333 should instead call self.scheduler_comm.retire_workers(names=list(to_close))

jrbourbeau commented 3 years ago

Thanks for raising an issue @nsmith-! I agree there's a mismatch between worker address and names in SpecCluster. For context, this was reported previously in https://github.com/dask/distributed/issues/4069 and there's a PR (xref https://github.com/dask/distributed/pull/4074) which attempted to fix things, but has since stalled. Is this something you're interested in working on? I can also try to revive https://github.com/dask/distributed/pull/4074, but can't promise when I'll be able to do so

nsmith- commented 3 years ago

I can try, but may not find time right away. I did make a workaround in my code, by adding the following to my dask-jobqueue Job child class:

    async def close(self):
        logger.debug("Closing worker: %s job: %s", self.name, self.job_id)
        if self._cluster:
            # workaround for https://github.com/dask/distributed/issues/4532
            ret = await self._cluster().scheduler_comm.retire_workers(names=[self.name])
            logger.debug(f"Worker retirement info: {ret}")
    # check the scheduler a few times, then force retire ...

In my case I know the worker name is the same as the spec name.