dask / dask-ml

Scalable Machine Learning with Dask
http://ml.dask.org
BSD 3-Clause "New" or "Revised" License
885 stars 255 forks source link

hyperparameter optimization example not working #801

Open scientes opened 3 years ago

scientes commented 3 years ago

What happened:

I tried the example for hyperparameter optimization: https://examples.dask.org/machine-learning/hyperparam-opt.html

What you expected to happen: it to not fail

Minimal Complete Verifiable Example:

# To add a new cell, type '# %%'
# To add a new markdown cell, type '# %% [markdown]'
# %%
from distributed import Client
client = Client()
client

# %%
from sklearn.datasets import make_circles
import numpy as np
import pandas as pd

X, y = make_circles(n_samples=30_000, random_state=0, noise=0.09)

pd.DataFrame({0: X[:, 0], 1: X[:, 1], "class": y}).sample(4_000).plot.scatter(
    x=0, y=1, alpha=0.2, c="class", cmap="bwr"
)

# %%
from sklearn.utils import check_random_state

rng = check_random_state(42)
random_feats = rng.uniform(-1, 1, size=(X.shape[0], 4))
X = np.hstack((X, random_feats))
X.shape

# %%
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=5_000, random_state=42)
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
scaler = StandardScaler().fit(X_train)

X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)
import numpy as np
from sklearn.neural_network import MLPClassifier

model = MLPClassifier()
params = {
    "hidden_layer_sizes": [
        (24, ),
        (12, 12),
        (6, 6, 6, 6),
        (4, 4, 4, 4, 4, 4),
        (12, 6, 3, 3),
    ],
    "activation": ["relu", "logistic", "tanh"],
    "alpha": np.logspace(-6, -3, num=1000),  # cnts
    "batch_size": [16, 32, 64, 128, 256, 512],
}
from dask_ml.model_selection import HyperbandSearchCV
# For quick response
n_examples = 4 * len(X_train)
n_params = 8

# In practice, HyperbandSearchCV is most useful for longer searches
# n_examples = 15 * len(X_train)
# n_params = 15

# %%
max_iter = n_params  # number of times partial_fit will be called
chunks = n_examples // n_params  # number of examples each call sees

max_iter, chunks

# %%
import dask.array as da
X_train2 = da.from_array(X_train, chunks=chunks)
y_train2 = da.from_array(y_train, chunks=chunks)
X_train2

# %%
search = HyperbandSearchCV(
    model,
    params,
    max_iter=max_iter,
    patience=True,
)

# %%
search.metadata["partial_fit_calls"]

# %%
search.fit(X_train2, y_train2, classes=[0, 1, 2, 3])

error message:

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/home/bastian/.local/lib/python3.8/site-packages/distributed/protocol/core.py", line 39, in dumps
    small_header, small_payload = dumps_msgpack(msg, **compress_opts)
  File "/home/bastian/.local/lib/python3.8/site-packages/distributed/protocol/core.py", line 184, in dumps_msgpack
    payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
  File "/home/bastian/.local/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
distributed.comm.utils - ERROR - can not serialize 'Delayed' object
Traceback (most recent call last):
  File "/home/bastian/.local/lib/python3.8/site-packages/distributed/comm/utils.py", line 32, in _to_frames
    protocol.dumps(
  File "/home/bastian/.local/lib/python3.8/site-packages/distributed/protocol/core.py", line 39, in dumps
    small_header, small_payload = dumps_msgpack(msg, **compress_opts)
  File "/home/bastian/.local/lib/python3.8/site-packages/distributed/protocol/core.py", line 184, in dumps_msgpack
    payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
  File "/home/bastian/.local/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/home/bastian/.local/lib/python3.8/site-packages/distributed/batched.py", line 93, in _background_send
    nbytes = yield self.comm.write(
  File "/home/bastian/.local/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/bastian/.local/lib/python3.8/site-packages/distributed/comm/tcp.py", line 230, in write
    frames = await to_frames(
  File "/home/bastian/.local/lib/python3.8/site-packages/distributed/comm/utils.py", line 52, in to_frames
    return _to_frames()
  File "/home/bastian/.local/lib/python3.8/site-packages/distributed/comm/utils.py", line 32, in _to_frames
    protocol.dumps(
  File "/home/bastian/.local/lib/python3.8/site-packages/distributed/protocol/core.py", line 39, in dumps
    small_header, small_payload = dumps_msgpack(msg, **compress_opts)
  File "/home/bastian/.local/lib/python3.8/site-packages/distributed/protocol/core.py", line 184, in dumps_msgpack
    payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
  File "/home/bastian/.local/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
distributed.scheduler - CRITICAL - Tried writing to closed comm: [{'op': 'key-in-memory', 'key': "('concatenate-a18fe0fc5cfece12af6977bd21e36f9f', 1, 0)", 'type': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94.'}]
distributed.scheduler - CRITICAL - Tried writing to closed comm: [{'op': 'key-in-memory', 'key': "('concatenate-a18fe0fc5cfece12af6977bd21e36f9f', 1, 0)", 'type': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94.'}]
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
<ipython-input-9-8272808f4a8d> in <module>
----> 1 search.fit(X_train2, y_train2, classes=[0, 1, 2, 3])

~/.local/lib/python3.8/site-packages/dask_ml/model_selection/_incremental.py in fit(self, X, y, **fit_params)
    704         client = default_client()
    705         if not client.asynchronous:
--> 706             return client.sync(self._fit, X, y, **fit_params)
    707         return self._fit(X, y, **fit_params)
    708 

~/.local/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    836             return future
    837         else:
--> 838             return sync(
    839                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    840             )

~/.local/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

~/.local/lib/python3.8/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

~/.local/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/.local/lib/python3.8/site-packages/dask_ml/model_selection/_hyperband.py in _fit(self, X, y, **fit_params)
    399         _brackets_ids = list(reversed(sorted(SHAs)))
    400 
--> 401         _SHAs = await asyncio.gather(
    402             *[SHAs[b]._fit(X, y, **fit_params) for b in _brackets_ids]
    403         )

~/.local/lib/python3.8/site-packages/dask_ml/model_selection/_incremental.py in _fit(self, X, y, **fit_params)
    650 
    651         with context:
--> 652             results = await fit(
    653                 self.estimator,
    654                 self._get_params(),

~/.local/lib/python3.8/site-packages/dask_ml/model_selection/_incremental.py in fit(model, params, X_train, y_train, X_test, y_test, additional_calls, fit_params, scorer, random_state, verbose, prefix)
    464         A history of all models scores over time
    465     """
--> 466     return await _fit(
    467         model,
    468         params,

~/.local/lib/python3.8/site-packages/dask_ml/model_selection/_incremental.py in _fit(model, params, X_train, y_train, X_test, y_test, additional_calls, fit_params, scorer, random_state, verbose, prefix)
    204     assert len(X_train) == len(y_train)
    205 
--> 206     train_eg = await client.gather(client.map(len, y_train))
    207     msg = "[CV%s] For training there are between %d and %d examples in each chunk"
    208     logger.info(msg, prefix, min(train_eg), max(train_eg))

~/.local/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1828                         else:
   1829                             raise exception.with_traceback(traceback)
-> 1830                         raise exc
   1831                     if errors == "skip":
   1832                         bad_keys.add(key)

CancelledError: len-34bea58765576ad97f48e2be85678f1e

Anything else we need to know?: maybe belongs on https://github.com/dask/dask-examples/ not sure Environment:


- Python version:python3.8.6
- Operating System: ubuntu 20.10
- Install method (conda, pip, source): pip
TomAugspurger commented 3 years ago

Thanks for the report. I can't reproduce it locally. Can you try with the dev versions of dask and distributed? It may already be fixed.

hristog commented 3 years ago

Perhaps, could be labeled with Needs Info, for the time being?

scientes commented 3 years ago

sry i forgot about this ill test it in April im atm slightly short on time