dask / dask-searchcv

dask-searchcv is now part of dask-ml: https://github.com/dask/dask-ml
BSD 3-Clause "New" or "Revised" License
240 stars 43 forks source link

Failure on model pipeline that succeeds using stock scikit-learn #57

Closed jimmywan closed 7 years ago

jimmywan commented 7 years ago

I can't reliably get my model pipeline that runs under scikit-learn to work using dask-searchcv, Is there an expectation of how much memory overhead dask will require as compared to scikit-learn? My test dataset is fairly small (< 10MB TSV) and my VM has something on the order of 8GB available to it.

I'm attempting to use dask-searchcv's GridSearchCV to build a model pipeline without redundantly executing a few expensive pipeline components. scikit-learn 0.19 has made strides in this area, but I'm looking to the future where I'll want to further parallelize my workload across more marchines and/or handle larger data sets and more complicated pipelines.

My dataset is fairly small (< 10MB TSV), so I'm trying to distribute the execution and avoid redundant computation, but don't necessarily need a distributed access to a dataframe at this time.

I modified my code to use dask-searchcv's GridSearchCV in "local distributed" mode as follows:

# from sklearn.model_selection import GridSearchCV
from dask_searchcv import GridSearchCV
...
    pipe = sklearn.pipeline.Pipeline(
    ....
    )
    # Use dask in distributed mode
    from distributed import Client
    c = Client()
    best_model = GridSearchCV(
            pipe,
            param_grid=param_grid,
            cv=10,
            scoring=my_error_metrics_function,
            n_jobs=cpu_count
    )

    with ProgressBar():
        best_model.fit(train_x, train_y)

I skimmed the documentation for distributed and it seems to support the idea that no additional setup is required. Since it didn't immediately die on me, I'm assuming my config/setup is correct.

gridsearchcv implementation result notes
scikit-learn 0.19 reliably succeeds Used in default mode without saving state
dask-searchcv occasionally fails From reading the code, this appears to use threaded_get. I've noticed that failures are more likely to appear if more than one core is available.
dask-searchcv in "local distributed mode" Ran for "a while" and then died with distributed.scheduler.KilledWorker. appeared to get smashed by the oomkiller.

Example dask-searchcv failure in default mode

I believe this is related to https://github.com/scikit-learn/scikit-learn/issues/2755

  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/sklearn/ensemble/forest.py", line 315, in fit
    random_state=random_state)
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/sklearn/ensemble/base.py", line 127, in _make_estimator
    for p in self.estimator_params))
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/sklearn/base.py", line 264, in set_params
    valid_params = self.get_params(deep=True)
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/sklearn/base.py", line 240, in get_params
    warnings.filters.pop(0)
IndexError: pop from empty list

Example failure in local distributed mode

I'm pretty sure this was the victim of the oomkiller.

File "MY_MODULE_build_models.py", line 426, in <module>
    best_model.fit(train_x, train_y)
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/dask_searchcv/model_selection.py", line 793, in fit
    out = scheduler(dsk, keys, num_workers=n_jobs)
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/distributed/client.py", line 1923, in get
    results = self.gather(packed, asynchronous=asynchronous)
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/distributed/client.py", line 1368, in gather
    asynchronous=asynchronous)
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/distributed/client.py", line 540, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/distributed/utils.py", line 239, in sync
    six.reraise(*error[0])
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/six.py", line 686, in reraise
    raise value
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/distributed/utils.py", line 227, in f
    result[0] = yield make_coro()
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/distributed/client.py", line 1246, in _gather
    traceback)
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/six.py", line 686, in reraise
    raise value
distributed.scheduler.KilledWorker: ("('mypipelinestep-fit-dfe1861ca5d06ad69d926709c897ffc9', 34, 0)", 'tcp://127.0.0.1:46091')

My environment

$ conda list | egrep '^(numpy|scipy|dask|scikit|distributed|python |tornado)'
dask                      0.15.2                   py36_0
dask-searchcv             0.0.2+8.g2c2056d           <pip>
distributed               1.18.1                   py36_0
numpy                     1.13.1                   py36_0
python                    3.6.1                         2
scikit-learn              0.19.0              np113py36_0
scipy                     0.19.1              np113py36_0
tornado                   4.5.2                    py36_0
TomAugspurger commented 7 years ago

When you create a Client with no arguments, a LocalCluster, which uses multiprocessing to communicate between "workers" (separate processes on your local machine). This will make copies of your dataset for each worker, which I suspect is blowing up your memory budget.

You could try

from distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=1)
client = Client(cluster)

You won't get any parallelism, but this will confirm whether it's a memory issue or not.

Or if you provide a reproducible example I can take a deeper look.

mrocklin commented 7 years ago

I recommend Client(processes=False) to avoid the extra copy between client->scheduler->worker

On Wed, Sep 6, 2017 at 3:14 PM, Tom Augspurger notifications@github.com wrote:

When you create a Client with no arguments, a LocalCluster, which uses multiprocessing to communicate between "workers" (separate processes on your local machine). This will make copies of your dataset for each worker, which I suspect is blowing up your memory budget.

You could try

from distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=1) client = Client(cluster)

You won't get any parallelism, but this will confirm whether it's a memory issue or not.

Or if you provide a reproducible example I can take a deeper look.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/dask-searchcv/issues/57#issuecomment-327585416, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszB6F1a8aYuzORfk4V9XcufAri_70ks5sfu8hgaJpZM4POvYV .

jimmywan commented 7 years ago

@TomAugspurger and @mrocklin thanks for your suggestions. I've tried both, and unfortunately neither is working for me. The frustrating part of all this is that the exceptions are being thrown from scikit-learn and not dask-searchcv.

I believe both of these might be to blame: scikit-learn/scikit-learn#2755 scikit-learn/scikit-learn#7346

Or if you provide a reproducible example I can take a deeper look.

I'd like to do this, but I've got quite a bit of proprietary customer data/code that I need to trim out before I can share this.

I've slightly updated my example in the OP to have a teeny bit more context which should hopefully help.

I've trimmed out irrelevant bits of the pipeline and it's basically down to this and it still fails:

I'm sort of hoping that someone out there can confirm/deny that these suggestions/configurations work on "something else". Do testcases for pipelines going through GridSearchCV exist in dask-searchcv? If not, perhaps I can help contribute some?

Limit Workers to 1

You could try...You won't get any parallelism, but this will confirm whether it's a memory issue or not.

I didn't reach the OOM error, but am getting a similar error(slightly different stack trace):

  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/sklearn/ensemble/forest.py", line 315, in fit
    random_state=random_state)
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/sklearn/ensemble/base.py", line 125, in _make_estimator
    estimator = clone(self.base_estimator_)
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/sklearn/base.py", line 63, in clone
    params_set = new_object.get_params(deep=False)
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/sklearn/base.py", line 240, in get_params
    warnings.filters.pop(0)
IndexError: pop from empty list

Use Threading

I recommend Client(processes=False) to avoid the extra copy between client->scheduler->worker

I tried that but got the same stack trace as the non-distributed use case.

  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/sklearn/ensemble/forest.py", line 315, in fit
    random_state=random_state)
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/sklearn/ensemble/base.py", line 130, in _make_estimator
    _set_random_states(estimator, random_state)
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/sklearn/ensemble/base.py", line 57, in _set_random_states
    estimator.set_params(**to_set)
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/sklearn/base.py", line 264, in set_params
    valid_params = self.get_params(deep=True)
  File "MY_VIRTUALENV_PATH/lib/python3.6/site-packages/sklearn/base.py", line 240, in get_params
    warnings.filters.pop(0)
IndexError: pop from empty list

FWIW, I'm running an Ubuntu Guest on a Windows 10 machine.

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 14.04 LTS
Release:        14.04
Codename:       trusty
jimmywan commented 7 years ago

Went ahead and distilled my code sample as much as I could and it's fairly simple now:

    pipe = sklearn.pipeline.Pipeline(
            [
                ('impute', sklearn.preprocessing.Imputer(strategy='median')),
                ('base_features', sklearn.preprocessing.StandardScaler()),
                ('base_regressor', ExtraTreesRegressor(random_state=42, n_jobs=1)),
            ],
    )

    param_grid = [
        {
            "base_regressor__min_samples_leaf": [3],
            "base_regressor__min_samples_split": [6, 8],
            "base_regressor__max_features": [0.25, 0.5, 0.75, 1.0],
            "base_regressor__n_estimators": [250],
        }
    ]

    cpu_count = multiprocessing.cpu_count()
    # Use dask in distributed mode
    from distributed import Client, LocalCluster
    # c = Client()
    # c = Client(processes=False)
    cluster = LocalCluster(n_workers=1)
    c = Client(cluster)
    best_model = GridSearchCV(
            pipe,
            param_grid=param_grid,
            cv=10,
            n_jobs=cpu_count,
    )

    with ProgressBar():
        best_model.fit(train_x, train_y)

This is the stacktrace:

  File "/home/vagrant/.pyenv/versions/miniconda3-4.1.11/envs/gambit/lib/python3.6/site-packages/dask_searchcv/model_selection.py", line 793, in fit
    out = scheduler(dsk, keys, num_workers=n_jobs)
  File "/home/vagrant/.pyenv/versions/miniconda3-4.1.11/envs/gambit/lib/python3.6/site-packages/distributed/client.py", line 1923, in get
    results = self.gather(packed, asynchronous=asynchronous)
  File "/home/vagrant/.pyenv/versions/miniconda3-4.1.11/envs/gambit/lib/python3.6/site-packages/distributed/client.py", line 1368, in gather
    asynchronous=asynchronous)
  File "/home/vagrant/.pyenv/versions/miniconda3-4.1.11/envs/gambit/lib/python3.6/site-packages/distributed/client.py", line 540, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/home/vagrant/.pyenv/versions/miniconda3-4.1.11/envs/gambit/lib/python3.6/site-packages/distributed/utils.py", line 239, in sync
    six.reraise(*error[0])
  File "/home/vagrant/.pyenv/versions/miniconda3-4.1.11/envs/gambit/lib/python3.6/site-packages/six.py", line 686, in reraise
    raise value
  File "/home/vagrant/.pyenv/versions/miniconda3-4.1.11/envs/gambit/lib/python3.6/site-packages/distributed/utils.py", line 227, in f
    result[0] = yield make_coro()
  File "/home/vagrant/.pyenv/versions/miniconda3-4.1.11/envs/gambit/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/home/vagrant/.pyenv/versions/miniconda3-4.1.11/envs/gambit/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/home/vagrant/.pyenv/versions/miniconda3-4.1.11/envs/gambit/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/vagrant/.pyenv/versions/miniconda3-4.1.11/envs/gambit/lib/python3.6/site-packages/distributed/client.py", line 1246, in _gather
    traceback)
  File "/home/vagrant/.pyenv/versions/miniconda3-4.1.11/envs/gambit/lib/python3.6/site-packages/six.py", line 685, in reraise
    raise value.with_traceback(tb)
  File "/home/vagrant/.pyenv/versions/miniconda3-4.1.11/envs/gambit/lib/python3.6/site-packages/dask_searchcv/methods.py", line 216, in fit
    est.fit(X, y, **fit_params)
  File "/home/vagrant/.pyenv/versions/miniconda3-4.1.11/envs/gambit/lib/python3.6/site-packages/sklearn/ensemble/forest.py", line 315, in fit
    random_state=random_state)
  File "/home/vagrant/.pyenv/versions/miniconda3-4.1.11/envs/gambit/lib/python3.6/site-packages/sklearn/ensemble/base.py", line 130, in _make_estimator
    _set_random_states(estimator, random_state)
  File "/home/vagrant/.pyenv/versions/miniconda3-4.1.11/envs/gambit/lib/python3.6/site-packages/sklearn/ensemble/base.py", line 52, in _set_random_states
    for key in sorted(estimator.get_params(deep=True)):
  File "/home/vagrant/.pyenv/versions/miniconda3-4.1.11/envs/gambit/lib/python3.6/site-packages/sklearn/base.py", line 240, in get_params
    warnings.filters.pop(0)
IndexError: pop from empty list

I'm trying to fix the problem on the scikit-learn side: https://github.com/scikit-learn/scikit-learn/issues/7346#issuecomment-327902191

But still wondering if there's a valid workaround by making dask-searchcv's GridSearchCV do whatever scikit-learn's GridSearchCV is doing to copy/preserve state.

TomAugspurger commented 7 years ago

Thanks for the example, you are indeed hitting the linked issue.

But still wondering if there's a valid workaround by making dask-searchcv's GridSearchCV do whatever scikit-learn's GridSearchCV is doing to copy/preserve state.

I don't think so. dask-searchcv isn't the one calling the unsafe get_params; that's comming from the estimator itself.

amueller commented 7 years ago

this is fixed in scikit-learn master now.

TomAugspurger commented 7 years ago

Confirmed that the follow example no long crashes. Thanks @amueller :)

import multiprocessing
import sklearn
from sklearn.ensemble import ExtraTreesRegressor
from sklearn.datasets import make_regression
from dask_searchcv import GridSearchCV

train_x, train_y = make_regression()

if __name__ == '__main__':

    pipe = sklearn.pipeline.Pipeline(
            [
                ('impute', sklearn.preprocessing.Imputer(strategy='median')),
                ('base_features', sklearn.preprocessing.StandardScaler()),
                ('base_regressor', ExtraTreesRegressor(random_state=42, n_jobs=1)),
            ],
    )

    param_grid = [
        {
            "base_regressor__min_samples_leaf": [3],
            "base_regressor__min_samples_split": [6, 8],
            "base_regressor__max_features": [0.25, 0.5, 0.75, 1.0],
            "base_regressor__n_estimators": [250],
        }
    ]

    cpu_count = multiprocessing.cpu_count()
    # Use dask in distributed mode
    from distributed import Client, LocalCluster
    # c = Client()
    # c = Client(processes=False)
    cluster = LocalCluster(n_workers=1)
    c = Client(cluster)
    best_model = GridSearchCV(
            pipe,
            param_grid=param_grid,
            cv=10,
            n_jobs=cpu_count,
    )

    best_model.fit(train_x, train_y)