python-adaptive / adaptive

:chart_with_upwards_trend: Adaptive: parallel active learning of mathematical functions
http://adaptive.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
1.13k stars 58 forks source link

runner does not work with dask adaptive scaling client #326

Open Kostusas opened 2 years ago

Kostusas commented 2 years ago

Minimal code to reproduce the error on local Jupyter notebook:

import distributed
import adaptive
adaptive.notebook_extension()

cluster = distributed.LocalCluster()
cluster.adapt(minimum=0, maximum=5) # works with manual scaling cluster.scale(5)

client = distributed.Client(cluster)

learner = adaptive.Learner1D(lambda x: x, bounds=(-1, 1))
runner = adaptive.Runner(learner, executor=client, goal=lambda l: l.loss() < 0.01)
runner.live_info()

cluster.close()

returns error:

Task exception was never retrieved
future: <Task finished name='Task-327' coro=<live_info.<locals>.update() done, defined at /opt/conda/lib/python3.9/site-packages/adaptive/notebook_integration.py:217> exception=AssertionError()>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.9/site-packages/adaptive/notebook_integration.py", line 226, in update
    status.value = _info_html(runner)
  File "/opt/conda/lib/python3.9/site-packages/adaptive/notebook_integration.py", line 258, in _info_html
    ("elapsed time", datetime.timedelta(seconds=runner.elapsed_time())),
  File "/opt/conda/lib/python3.9/site-packages/adaptive/runner.py", line 658, in elapsed_time
    assert self.task.cancelled()
AssertionError

The same thing happens when running on a cluster with manual scaling without giving enough time to connect to the workers. It seems adaptive does not see any workers and terminates the process.

akhmerov commented 2 years ago

I think we should change the heuristic for determining how many workers we have available by checking the client configuration and scaling strategies.

basnijholt commented 2 years ago

You are running into this error: https://github.com/python-adaptive/adaptive/blob/f28bab073fed8723b0569fcfb6886fccc2133ecd/adaptive/runner.py#L403-L404

because you start with 0 cores.

If you change your argument from minimum=0 to minimum=1, Adaptive does detect the scaling correctly.

Would this be good enough for you?

akhmerov commented 2 years ago

This seems to be a workaround, but I think actually detecting the configuration would be more reliable. Unfortunately I can't quite find the correct API in distributed.

akhmerov commented 2 years ago

I've asked whether there's a better way on stack overflow (AFAIR that's the preferred channel for dask): https://stackoverflow.com/q/69326568/2217463

basnijholt commented 2 years ago

Why would the maximal number of cores matter instead of the currently available cores?

akhmerov commented 2 years ago

It's a chicken and egg problem otherwise: the adaptive scaling of dask won't request new workers if there are no tasks in the queue.

basnijholt commented 2 years ago

Hmm, then we would already query some points that will not be calculated yet.

Why not change the following

https://github.com/python-adaptive/adaptive/blob/a81be7aa8a703c44ab348710f339686f4eb57641/adaptive/runner.py#L832-L833

to

elif with_distributed and isinstance(ex, distributed.cfexecutor.ClientExecutor): 
    ncores = sum(n for n in ex._client.ncores().values()) 
    return max(1, ncores)