Open mrocklin opened 4 years ago
Do you have reproducer for the event ?
Do you have reproducer for the event ?
import time
from dask.distributed import Client, LocalCluster
from distributed.worker_client import get_worker
def test_fun():
time.sleep(5)
return get_worker().name
if __name__ == "__main__":
# create simple task graph
n = 4
graph = {f"test_{i}": (test_fun,) for i in range(n)}
targets = [f"test_{i}" for i in range(n)]
workers = 10 # we shouldn't need more than n=4 workers
# LocalCluster
with LocalCluster(
n_workers=1, processes=True, threads_per_worker=1, memory_limit=0
) as cluster:
cluster.adapt(minimum=1, maximum=workers)
with Client(cluster) as client:
client.register_worker_callbacks(
lambda dask_worker: print("setup", dask_worker.name)
)
print(client.get(graph, targets))
prints:
setup 0
setup 1
setup 2
setup 3
setup 6
setup 7
setup 5
setup 4
setup 8
setup 9
[0, 0, 1, 2]
Currently Dask supports adaptive scaling. This allows it to scale the number of workers based on load. The logic for how many workers to ask for is based on many things, like how much work we have, how much memory we need, and so on.
I recently observed a cluster scale up beyond the number of available tasks. It shouldn't do this. I had three tasks, each of which took several minutes, and I saw my cluster scale up to ten workers. Probably there should be a maximum applied in there somewhere that wasn't being set. There appears to already be some code to do this (the intent is there) but perhaps there is a but. The relevant code is here
https://github.com/dask/distributed/blob/4115f55c655000b04a8132c9ca62fa07bd6df6f5/distributed/scheduler.py#L5337-L5390