dask / dask-ml

Scalable Machine Learning with Dask
http://ml.dask.org
BSD 3-Clause "New" or "Revised" License
893 stars 255 forks source link

Sklearn random forest does not train on dask yarn cluster #753

Open mattchurgin opened 3 years ago

mattchurgin commented 3 years ago

What happened: I am attempting to train a model using dask-ml. I've had success using ParallelPostFit, ColumnTransformer, and related dask-ml pipeline utilities to train and predict using sklearn's LogisticRegression as the classifier. However, when I've tried to use sklearn's RandomForestClassifier I have run into issues. Here I have tried to create a minimal example. The code below runs successfully when the classifier is LogisticRegression, but hangs when the classifier is RandomForest. I don't know why this is happening. I am running on a dask yarn cluster. Thank you very much in advance for your help!

Minimal Complete Verifiable Example:

import dask
import dask_yarn
import dask.array as da
from dask.distributed import LocalCluster, Client, progress
from dask_yarn import YarnCluster
from dask_ml.datasets import make_classification as make_classification_dask
from dask_ml.model_selection import train_test_split as train_test_split_dd

from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
import joblib

worker_env={
    'ARROW_LIBHDFS_DIR': '/usr/hdp/2.6.1.40-4/usr/lib',
    'HADOOP_HOME': '/usr/hdp/current/hadoop-client'
}
cluster = YarnCluster(environment=path_to_packed_conda_env, 
                      n_workers=10, 
                      worker_vcores=10,
                      worker_env=worker_env,
                      worker_restarts=10,
                      scheduler_memory='10GiB',
                      scheduler_vcores=5,
                      worker_memory='10GiB')

client = Client(cluster)

RANDOM_STATE = 42

clf_rfc = RandomForestClassifier(n_estimators=10, 
                                 n_jobs=5, 
                                 criterion='gini',
                                 max_features='auto',
                                 min_samples_split = 50,
                                 class_weight='balanced', 
                                 verbose=1,
                                 random_state=RANDOM_STATE)

clf = LogisticRegression(penalty='l2',
                         dual=False,
                         tol=0.0001,
                         C=0.1,
                         fit_intercept=True,
                         intercept_scaling=1,
                         #class_weight='balanced',
                         random_state=RANDOM_STATE,
                         solver='liblinear',
                         max_iter=100,
                         multi_class='auto',
                         verbose=0,
                         warm_start=False,
                         n_jobs=-1)

# make 4 Gb dataset
n, d = 10000000, 50

X, y = make_classification_dask(n_samples=n, 
                                n_features=d,
                                chunks=n//10,
                                flip_y=0.2,
                                random_state=0)

X_train, X_test, y_train, y_test = train_test_split_dd(X, y, train_size = 0.8, random_state=RANDOM_STATE)

# pipe = clf
pipe = clf_rfc

X_train = X_train.persist()
y_train = y_train.persist()

with joblib.parallel_backend('dask'):
    pipe.fit(X_train, y_train)

Environment:

TomAugspurger commented 3 years ago

I'm not sure why that would be. Do you see issues with random forest and the Loky backend from joblib? IIRC the random forest algorithm requires shared memory for some parts, but I thought that was just in .predict and not train.

On Mon, Nov 2, 2020 at 4:35 PM Matt Churgin notifications@github.com wrote:

What happened: I am attempting to train a model using dask-ml. I've had success using ParallelPostFit, ColumnTransformer, and related dask-ml pipeline utilities to train and predict using sklearn's LogisticRegression as the classifier. However, when I've tried to use sklearn's RandomForestClassifier I have run into issues. Here I have tried to create a minimal example. The code below runs successfully when the classifier is LogisticRegression, but hangs when the classifier is RandomForest. I don't know why this is happening. I am running on a dask yarn cluster.

Minimal Complete Verifiable Example:

import dask import dask_yarn import dask.array as da from dask.distributed import LocalCluster, Client, progress from dask_yarn import YarnCluster from dask_ml.datasets import make_classification as make_classification_dask from dask_ml.model_selection import train_test_split as train_test_split_dd

from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier from sklearn.linear_model import LogisticRegression import joblib

worker_env={ 'ARROW_LIBHDFS_DIR': '/usr/hdp/2.6.1.40-4/usr/lib', 'HADOOP_HOME': '/usr/hdp/current/hadoop-client' } cluster = YarnCluster(environment=path_to_packed_conda_env, n_workers=10, worker_vcores=10, worker_env=worker_env, worker_restarts=10, scheduler_memory='10GiB', scheduler_vcores=5, worker_memory='10GiB')

client = Client(cluster)

RANDOM_STATE = 42

clf_rfc = RandomForestClassifier(n_estimators=10, n_jobs=5, criterion='gini', max_features='auto', min_samples_split = 50, class_weight='balanced', verbose=1, random_state=RANDOM_STATE)

clf = LogisticRegression(penalty='l2', dual=False, tol=0.0001, C=0.1, fit_intercept=True, intercept_scaling=1,

class_weight='balanced',

                     random_state=RANDOM_STATE,
                     solver='liblinear',
                     max_iter=100,
                     multi_class='auto',
                     verbose=0,
                     warm_start=False,
                     n_jobs=-1)

make 4 Gb dataset

n, d = 10000000, 50

X, y = make_classification_dask(n_samples=n, n_features=d, chunks=n//10, flip_y=0.2, random_state=0)

X_train, X_test, y_train, y_test = train_test_split_dd(X, y, train_size = 0.8, random_state=RANDOM_STATE)

pipe = clf

pipe = clf_rfc

X_train = X_train.persist() y_train = y_train.persist()

with joblib.parallel_backend('dask'): pipe.fit(X_train, y_train)

Environment:

  • Dask version: 2.11
  • Dask ML version: 1.4.0
  • Python version: 3.7
  • Operating System: Linux
  • Install method (conda, pip, source): conda

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/dask-ml/issues/753, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAKAOIT47WJK25NSP2DPFBDSN4XZRANCNFSM4TIA3OOQ .

mattchurgin commented 3 years ago

Thanks. Loky backend worked.

mattchurgin commented 3 years ago

Is there a reason why dask backend would have issues training or predicting with RF?

TomAugspurger commented 3 years ago

RF using shared memory is the only thing that comes to mind, but that should have affected the long backend too. Not sure what’s going on.

On Nov 3, 2020, at 07:02, Matt Churgin notifications@github.com wrote:

 Is there a reason why dask backend would have issues training or predicting with RF?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub, or unsubscribe.