dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

Worker restriction using alias doesn't resolve after worker restart #8600

Open jinmannwong opened 6 months ago

jinmannwong commented 6 months ago

Describe the issue:

Worker restrictions when specified using the worker name can fail to be resolved if the named worker is restarted and allow_other_workers = False. I think this is because the worker_restrictions in TaskState records the output of Scheduler.coerce_address, which is most likely not the same when the worker is restarted. As a result, the worker restriction is never fulfilled even though the restarted worker keeps the same name. Is this the intended behaviour?

Minimal Complete Verifiable Example:

import time

def test_func(duration: int):
    time.sleep(duration)

def main():
    from dask.distributed import Scheduler, Nanny, Client, SpecCluster, as_completed 

    scheduler =  {'cls': Scheduler, 'options': {"dashboard_address": ':8787'}}
    workers = {
        'worker-0': {"cls": Nanny, "options": {"nthreads": 1}},
    }
    cluster = SpecCluster(scheduler=scheduler, workers=workers)
    client = Client(cluster)
    future = client.submit(test_func, 60, workers="worker-0", allow_other_workers=False)
    time.sleep(10) # to ensure task is submitted 
    for key, task in cluster.scheduler.tasks.items():
        print("BEFORE RESTART", key, task.worker_restrictions, task.host_restrictions)

    client.restart_workers(["worker-0"])

    for _ in as_completed([future]):
        pass

if __name__ == "__main__":
    main()

The output I get from the above example shows the scheduler and worker starting. The initial execution of thetest_func task does not complete before the worker is restarted, after which the program hangs because the worker restriction is never fulfilled. The output of the print statement shows that the worker restriction in the task state records the address of the initial worker.

Environment: