dask / dask-ml

Scalable Machine Learning with Dask
http://ml.dask.org
BSD 3-Clause "New" or "Revised" License
893 stars 255 forks source link

Dask-GridSearchCv is 3-4x slow compared to Sklearn-GridSearchCv (using MLPClassifier with SGD optimizer) #149

Open kanwals opened 6 years ago

kanwals commented 6 years ago

Background:

I've been doing a research project which requires conducting some experiments to compare runtime baselines for data parallel and task parallel hyperparameter tuning techniques for Neural Network training.

Two of these experiments involved comparing sklearn's GridSearchCV against dask's GridSearchCV implementation when run on a single node. Since Dask's implementation is more or less a wrapper over Sklearn's implementation, I was expecting very similar results. The setup for this experiment was:

Code and relevant files Sklearn GridSearchCV python code, Dask GridSearchCV python code, their log files and generated pickle files are present in the attached zip file.

From here-on, I refer to sklearn's GridSearchCV experiment as exp3 and dask's GridSearchCV experiment as exp4. This is just to make things easier to understand, and consistent with my uploaded code for reproducibility.

exp3 and exp4 are exactly similar except just one import statement. from sklearn.model_selection import GridSearchCV in exp3 gets replaced by dask_ml.model_selection import GridSearchCV in exp4

Hyperparameter grid param_grid = {'mlpclassifier__batch_size': [128, 256, 512, 1024], 'mlpclassifier__hidden_layer_sizes':[(16,16)], 'mlpclassifier__solver':['lbfgs','sgd'], 'mlpclassifier__learning_rate_init':[0.0001, 0.001, 0.01, 0.1], 'mlpclassifier__max_iter': [50], 'mlpclassifier__nesterovs_momentum': [False], 'mlpclassifier__activation': ['logistic'], 'mlpclassifier__alpha': [0.0001, 0.001, 0.01, 0.1], 'mlpclassifier__momentum': [0], 'mlpclassifier__shuffle': [False], 'mlpclassifier__random_state': [1]}

Additional info that may be relevant My goal for both of these experiments was to maximize CPU usage while still being able to use all cores in the machine. First, for exp3, I tried to use all 8 cores for parallel model training (setting "n_jobs=8" and "pre_dispatch=8" in GridCV params). Because of the dataset size and may be internal creation of temporary arrays, it was not possible, I faced memory overflow errors when I tried running GridSearchCV using sklearn's implementation on all 8 cores. However, I was able use 4 cores ("n_jobs"=4 and "pre_dispatch"=4 in GridCV params) without blowing up memory, and hence could train 4 models in parallel on the full dataset. Since Dask's GridSearchCV calls Sklearn's GridSearchCV internally, I expected to get the same behavior for exp4, and I did get it.

The actual issue As can be seen in the param grid, there are a total of 128 different models that will be trained for both exp3 and exp4. Out of those 128 models, 64 will be for "lbfgs" and 64 will be for "sgd" as the optimization (solver) method. Since DaskML's implementation calls Sklearn's GridSearchCV implementation, I was hoping to observe similar training times for each model configuration across exp3 and exp4. As shown in the "Observing Issue" and "Reproducing Issue" section below, I was able to get very similar training runtimes for lbfgs as the solver, whereas sgd was 3-4x slower for dask. I went over the dask's source code but couldn't locate why this is happening. Exp3 and exp4 literally have the same code apart from a single import statement change, and a missing "pre_dispatch" argument in dask's GridSearchCV (as it is not provided for tuning).

Observing Issue

Reproducing Issue Run the python files after downloading the HIGGS dataset and extracting in the same directory as the code. Make sure versions are consistent. Feel free to reduce the number of hyperparameter combinations to save wait time.

exp3 and exp4.zip

mrocklin commented 6 years ago

I would be curious to see what is taking up most of the time. You might consider using some of the diagnostic tools: http://dask.pydata.org/en/latest/understanding-performance.html

TomAugspurger commented 6 years ago

Will take a closer look later in the week. https://github.com/dask/dask-ml/issues/141 may be interesting in the meantime.

Which joblib backend did you use for sklearn? The default multiprocessing? If you use

import dask.multiprocessing

with dask.set_options(get=dask.multiprocessing.get):
    exp4.fit(X)

do you see similar behavior? It's possible the GIL is not being released.

kanwals commented 6 years ago

@mrocklin Thanks for the pointers. I'll take a look.

@TomAugspurger I did not mention the scheduler while running Dask's GridSearchCV, so it must be the default. I'll try what you suggested and share results here. However, it is strange that lbfgs is not affected but sgd is. If it was a scheduling problem, both should have been slow in my opinion.

It is also worth noting that I conducted a similar experiment over a cluster with four dask workers. To do that, I wrote:

from sklearn.externals import joblib from dask_ml.model_selection import GridSearchCV import distributed.joblib from distributed import Client client = Client("10.zz.yy.xx:8786")`

with joblib.parallel_backend('dask.distributed', scheduler_host=client.scheduler.address): clf.fit(X_train, y_train) I observed similar slowdown for SGD again, but not for LBFGS.

TomAugspurger commented 6 years ago

However, it is strange that lbfgs is not affected but sgd is. If it was a scheduling problem, both should have been slow in my opinion.

To clarify, the default in scikit-learn is multiprocessing. The default from dask's is (I think) threaded. So if the expensive part of the .fit doesn't release the GIL, we'd see better performance from scikit-learn. It's possible that LBFGS releases the GIL, but SGD doesn't.

But, that's just an uniformed guess. Could easily be something different.

kanwals commented 6 years ago

@TomAugspurger updated my comment above.

I'll anyways go ahead and run with the changes you suggested below:

import dask.multiprocessing

with dask.set_options(get=dask.multiprocessing.get): exp4.fit(X)

kanwals commented 6 years ago

@TomAugspurger I tried using dask's multiprocessing backend as you suggested. I got a memory error. Below is the stack trace. Python code is attached in this comment.

clf.fit(X_train, y_train) File "/usr/local/lib/python2.7/dist-packages/dask_searchcv/model_selection.py", line 867, in fit out = scheduler(dsk, keys, num_workers=n_jobs) File "/usr/local/lib/python2.7/dist-packages/dask/multiprocessing.py", line 177, in get raise_exception=reraise, *kwargs) File "/usr/local/lib/python2.7/dist-packages/dask/local.py", line 521, in get_async raise_exception(exc, tb) File "/usr/local/lib/python2.7/dist-packages/dask/local.py", line 290, in execute_task result = _execute_task(task, data) File "/usr/local/lib/python2.7/dist-packages/dask/local.py", line 270, in _execute_task args2 = [_execute_task(a, cache) for a in args] File "/usr/local/lib/python2.7/dist-packages/dask/local.py", line 270, in _execute_task args2 = [_execute_task(a, cache) for a in args] File "/usr/local/lib/python2.7/dist-packages/dask/local.py", line 267, in _execute_task return [_execute_task(a, cache) for a in arg] File "/usr/local/lib/python2.7/dist-packages/dask/local.py", line 271, in _execute_task return func(args2) File "/usr/local/lib/python2.7/dist-packages/dask_searchcv/methods.py", line 216, in fit est.fit(X, y, **fit_params) File "/usr/local/lib/python2.7/dist-packages/sklearn/neural_network/multilayerperceptron.py", line 973, in fit hasattr(self, "classes"))) File "/usr/local/lib/python2.7/dist-packages/sklearn/neural_network/multilayer_perceptron.py", line 366, in _fit deltas = [np.empty_like(a_layer) for a_layer in activations] MemoryError

exp4_dask_MLP_pipelined_DasksMultiprocessingBackend.zip

jakirkham commented 6 years ago

That might be an indication that the hardware has been oversubscribed. Probably worth trying to create multiprocessing.Pool with a fixed number of processes and using that instead.

jtromans commented 5 years ago

This is a very interesting thread - where did you get to @kanwals ?

Tachyon5 commented 5 years ago

Has anyone tried using the multiprocessing.Pool?