Open fjetter opened 2 years ago
Some prior art
from coiled.v2 import Cluster
from dask.distributed import Client
from contextlib import ExitStack
from time import sleep
def inc(i, **kwargs):
return i + 1
def sum_slow(data, delay=0.02, **kwargs):
sleep(delay)
return sum(data)
def inner_test(client, start, length):
list_fut1 = client.scatter(list(range(start, start+length)))
list_fut2 = client.scatter(list(range(start+length, start+length+length)))
futures = client.map(inc, list_fut1)
barrier = client.submit(sum_slow, futures, delay=1.0)
futures = client.map(inc, list_fut2, barrier=barrier)
return client.submit(sum_slow, futures)
def run():
with ExitStack() as stack:
cluster = stack.enter_context(Cluster(n_workers=10))
client = stack.enter_context(Client(cluster))
print('Dashboard:', client.dashboard_link)
cluster.adapt(minimum=0, maximum=20)
futures = inner_test(client, 0, 50000)
print(f"Test 1: {futures.result()}")
cluster.adapt(minimum=0, maximum=50)
futures = inner_test(client, 0, 50000)
print(f"Test 2: {futures.result()}")
futures = inner_test(client, 0, 100000)
print(f"Test 3: {futures.result()}")
if __name__ == "__main__":
run()
I think there should be two separate use cases:
The pseudocode above illustrates n.2.
I expect the prior art above to hang because of https://github.com/dask/distributed/issues/6686.
Idea: Adaptive Scaling
Let's run a workload that expands, contracts, expands, and contracts, turn on adaptive scaling, and see if things work smoothly.
Pseudocode
We would then want to verify that things did actually scale down. We'd also want to verify that we didn't have to recompute anything.