dask / dask-ml

Scalable Machine Learning with Dask
http://ml.dask.org
BSD 3-Clause "New" or "Revised" License
905 stars 256 forks source link

Is learning_curve fully supported by Dask? #505

Open H4dr1en opened 5 years ago

H4dr1en commented 5 years ago

The problem is fully described here:

For 2 levels of parallelism like in GridSearchCV or RandomSearchCV problems, Dask provides its own implementation. Now for learning_curve, there is also such 2 level of parallelism. Can it be simply handled like this?

estimator = RandomForestRegressor(n_jobs=-1, **rf_params)
with joblib.parallel_backend('dask', scatter=[X,Y]):
    train_sizes, train_scores, test_scores = learning_curve(estimator, X, Y, cv=cv, n_jobs=-1, train_sizes=train_sizes)

If you take a look at the source code for the parallel part, you will see:

 out = parallel(delayed(_fit_and_score)(
            clone(estimator), X, y, scorer, train, test, verbose,
            parameters=None, fit_params=None, return_train_score=True,
            error_score=error_score)
for train, test in train_test_proportions)

If a Dask Client is created with processes=False (multi-threading parallelism), then Dask shows a warning saying that 2 levels of parallelism have been found and therefore the "inner" one is forced to have n_jobs=1. But this is not the case for multiprocessing parallelism.

In particular in the case of RandomForests, what should be the value of n_jobs for the estimator and for the learning_curve call?

(since joblib 0.12.2, RandomForests backend is not enforced to be threading, see PR here)

H4dr1en commented 5 years ago

I just ran a few tests to check how scalable using Dask was using the following code:

worker_kwargs = dict(memory_limit="1GB", ncores=1)
cluster = LocalCluster(n_workers=8, threads_per_worker=1, **worker_kwargs) # n_workers from 2 to 16
client = Client(cluster)

estimator = RandomForestRegressor(**rf_params, n_jobs=-1)
cv = ShuffleSplit(n_splits=5, test_size=0:2)

with joblib.parallel_backend(backend, scatter=[X,Y]):
       train_sizes, train_scores, test_scores = learning_curve(estimator, X, Y, cv=cv, n_jobs=-1, train_sizes)

Here are the results (running on a computer with 16 cores, 16GB of RAM):

image

I froze ncores=1 and thread_per_worker=1 otherwise it introduces parallelism and I cannot judge about the scalability (but I am not sure if it is the right way to do it).

I would expect that the running time decreases as the number of workers increases, but now, looking at the results I cannot explain this behavior (I took care of running 2-3 times the algorithm for each n_workers value to get a stable value).

Note that for n_workers <= 2, it takes for ever. (I kill it after 10 mins).

I currently cannot access to the Dask web UI, as soon as I will, I will use it to investigate further.

TomAugspurger commented 5 years ago

I wouldn't expect dask to offer any improvement over Joblib's other backends on a single machine. I would only recommend using the dask backend on a distributed cluster.

H4dr1en commented 5 years ago

OK, I will run the same experiment on a distributed cluster and come back with the results.