optuna / optuna-examples

Examples for https://github.com/optuna/optuna
MIT License
674 stars 176 forks source link

BUG - dask integration example not working #157

Closed cherusk closed 1 year ago

cherusk commented 1 year ago

Expected behavior

That the code sample executes on the pre-provisioned dask cluster and that the optuna to dask distributed integration is working seamlessly.

Environment

Error messages, stack traces, or logs

[2023-01-03, 00:00:14 UTC] {taskinstance.py:1851} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.9/site-packages/airflow/decorators/base.py", line 188, in execute return_value = super().execute(context) File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 175, in execute return_value = self.execute_callable() File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 193, in execute_callable return self.python_callable(*self.op_args, self.op_kwargs) File "/opt/airflow/dags/root_dag.py", line 93, in run_optimization storage = optuna.integration.DaskStorage(InMemoryStorage()) File "/home/airflow/.local/lib/python3.9/site-packages/optuna/_experimental.py", line 115, in wrapped_init _original_init(self, *args, *kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/optuna/integration/dask.py", line 446, in init self.client.run_on_scheduler(_register_with_scheduler, storage=storage, name=self.name) File "/home/airflow/.local/lib/python3.9/site-packages/distributed/client.py", line 2740, in run_on_scheduler return self.sync(self._run_on_scheduler, function, args, kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 339, in sync return sync( File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 406, in sync raise exc.with_traceback(tb) File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 379, in f result = yield future File "/home/airflow/.local/lib/python3.9/site-packages/tornado/gen.py", line 762, in run value = future.result() File "/home/airflow/.local/lib/python3.9/site-packages/distributed/client.py", line 2691, in _run_on_scheduler response = await self.scheduler.run_function( File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 1155, in send_recv_from_rpc return await send_recv(comm=comm, op=key, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 945, in send_recv raise exc.with_traceback(tb) File "/opt/conda/lib/python3.9/site-packages/distributed/core.py", line 820, in _handle_comm File "/opt/conda/lib/python3.9/site-packages/distributed/worker.py", line 3217, in run File "/opt/conda/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 73, in loads ModuleNotFoundError: No module named 'optuna'

# error messages, stack traces, or logs

Steps to reproduce

  1. take code used and rerun

Reproducible examples (optional)

From https://github.com/cherusk/godon/blob/master/breeder/linux_network_stack/root_dag.py#L83

# Essentially the example code

import optuna
from optuna.storages import InMemoryStorage
from optuna.integration import DaskStorage
from dask.distributed import Client
from dask.distributed import wait

def run_optimization():
            # boilerplate from https://jrbourbeau.github.io/dask-optuna/

            def objective(trial):
                x = trial.suggest_uniform("x", -10, 10)
                return (x - 2) ** 2

            with Client(address="godon_dask_scheduler_1:8786") as client:
                # Create a study using Dask-compatible storage
                storage = optuna.integration.DaskStorage(InMemoryStorage())
                study = optuna.create_study(storage=storage)
                # Optimize in parallel on your Dask cluster
                futures = [
                    client.submit(study.optimize, objective, n_trials=10, pure=False)
                    for i in range(10)
                ]
                wait(futures)
                print(f"Best params: {study.best_params}")

optimization_step = run_optimization()

Additional context (optional)

The end of the trace is quite strange, because nothing was installed via conda. All comes in via pip: https://github.com/cherusk/godon/blob/master/Dockerfile-airflow#L3

optuna meant to run as backend of godon project: https://github.com/cherusk/godon

cherusk commented 1 year ago

Maybe @jrbourbeau, because he authored the example code!?

nzw0301 commented 1 year ago

Could you try import optuna on your environment to make sure your env is correctly setup?

cherusk commented 1 year ago

@nzw0301 yes, you can see the import also in the code snippet I provided.

But I ran it manually now also in the airflow container:

airflow@56d394fe9f60:/opt/airflow$ python3
Python 3.9.15 (main, Oct 25 2022, 05:49:37)
[GCC 10.2.1 20210110] on linux
>>> import optuna
>>>

It succeeded as you can see. That the error stems from the dask workers I doubt, because it fails already at the optuna.integration.DaskStorage step.

cherusk commented 1 year ago

@nzw0301

Thanks, can you import |DaskStorage| like

@.***:/opt/airflow$ python3 Python 3.9.15 (main, Oct 25 2022, 05:49:37) [GCC 10.2.1 20210110] on linux

from optuna.integration import DaskStorage

Yes, also that works when done manually.

I've done that now manually for all components to be imported, with success:

***@***.***:/opt/airflow$ python3
Python 3.9.15 (main, Oct 25 2022, 05:49:37)
[GCC 10.2.1 20210110] on linux
 >>> from optuna.integration import DaskStorage
 >>> from dask.distributed import Client
 >>> from dask.distributed import wait
 >>>
cherusk commented 1 year ago

The example does also does not state the version requirements at the point when it worked. :cry:

cherusk commented 1 year ago

I've found the issue.

The installs on the scheduler and the workers was done with conda, that's where this was coming from.

File "/opt/conda/lib/python3.9/site-packages/distributed/core.py", line 820, in _handle_comm
File "/opt/conda/lib/python3.9/site-packages/distributed/worker.py", line 3217, in run
File "/opt/conda/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 73, in loads

Installing the pre-release version of optuna on the target dask cluster solved the issue. :flushed: