dmlc / xgboost

Scalable, Portable and Distributed Gradient Boosting (GBDT, GBRT or GBM) Library, for Python, R, Java, Scala, C++ and more. Runs on single machine, Hadoop, Spark, Dask, Flink and DataFlow
https://xgboost.readthedocs.io/en/stable/
Apache License 2.0
26.3k stars 8.73k forks source link

xgboost.dask.DaskXGBClassifier not working with >1 dask distributed worker in case of large datasets #5451

Closed harshit-2115 closed 4 years ago

harshit-2115 commented 4 years ago

Hi XGBoost devs, I am running the this code on an EC2 machine with 32 threads and 128 GB ram. The size of csv being loaded in 800MB.

class ColumnSelector(BaseEstimator, TransformerMixin):
    def __init__(self, columns=[]):
        self.columns = columns

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        cols_missing = list(set(self.columns) - set(X.columns))
        print(cols_missing)
        for cols in cols_missing:
            X[cols] = np.nan
        return X[self.columns]

cluster = LocalCluster(n_workers=1)
client = Client(cluster)
client

lp3 = dd.read_csv('./collection_model/collection_train.csv')
lp3

pre_pipe = Pipeline([
('colsel', ColumnSelector(columns=['column1', 'column2', ......]],
  )),
('missna', CustomMissingImputer()),
])
post_pipe= Pipeline([
('pre_pipe', pre_pipe),
('impute', IterativeImputer(n_nearest_features=5, max_iter=100, random_state=0)),
('qt', QuantileTransformer(n_quantiles=10))])

pi= xgboost.dask.DaskXGBClassifier(tree_method='hist')
pi.client= client

param_grid = {
    'learning_rate': [0.1, 0.2],
    'n_estimators': [100],
    'reg_lambda': [0.7],
    }

kfold = 5
skf = model_selection.StratifiedKFold(
    n_splits = kfold, shuffle = True, random_state = 30
)

scoring_1=make_scorer(ks_scorer, greater_is_better = True, needs_proba = True)
scoring={'auc': 'roc_auc', 'ks_scorer': scoring_1}

clf=GridSearchCV(
        estimator = pi,
        param_grid = param_grid,
        verbose = 5,
        cv = skf,
        iid = True,
        return_train_score = True,
        scoring = 'neg_mean_squared_error',
        refit = False
    )

pp = post_pipe.fit_transform(lp3,lp3['target'])        

label = da.from_array(np.array(lp3['target'].astype(int).compute()), chunks=200000)
clf.fit(da.from_array(pp, chunks=200000),label)
clf.cv_results_

It works if the model is trained using a subset of the features with worker=1.

Some cases where it fails :

  1. Same subset of features and with workers > 1, It keeps on running in the notebook with no result. In terminal, WARNING: /home/conda/feedstock_root/build_artifacts/xgboost_1584539733809/work/src/objective/regression_obj.cu:58: Label set is empty.

  2. Using all features with worker=1, it gives memory warnings in the terminal

    distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 118.65 GB -- Worker memory limit: 126.75 GB```
    and after some time error in notebook
    ```KilledWorker: ("('from-delayed-pandas_read_text-read-block-assign-d8c832b8fa114d4e528a9953dd6402de', 0)", <Worker 'tcp://127.0.0.1:40365', name: 0, memory: 0, processing: 11>)

    How can a 800MB csv file consume 118 GB memory ?

Also, there is not 'predict_proba' attribute in DaskXgbClassifier, so metrics like roc_auc gives error.

Currently, we are using xgboost with sklearn gridsearch(to distribute the fits). With large datasets, hyper-parameter tuning jobs with 4k-5k fits take days to complete on EC2 and sagemaker.

We are trying dask xgboost to reduce training time.

trivialfis commented 4 years ago

Hi thanks for rasing an issue. Here are some questions:

harshit-2115 commented 4 years ago

Thanks for the prompt reply.

@trivialfis

harshit-2115 commented 4 years ago
import os
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline, make_pipeline, FeatureUnion
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import FeatureUnion, Parallel, delayed,    _fit_transform_one, _transform_one
from sklearn.model_selection import GridSearchCV
from sklearn import model_selection

import xgboost
import json

from sklearn.preprocessing import QuantileTransformer
from sklearn.metrics import accuracy_score,roc_curve,roc_auc_score
from sklearn.metrics import make_scorer
from sklearn.experimental import enable_iterative_imputer
from sklearn.datasets import make_classification
from sklearn.impute import IterativeImputer, MissingIndicator
import random

import dask.dataframe as dd
import dask.array as da
from dask.distributed import LocalCluster, Client

cluster = LocalCluster(n_workers=10)
client = Client(cluster)
client

X, y = make_classification(n_features=244, n_samples=815414, n_classes=2)

DX = da.from_array(X, chunks = 200000)
Dy = da.from_array(y, chunks = 200000)

post_pipe = Pipeline([
('impute', IterativeImputer(n_nearest_features = 5,max_iter = 100,random_state=0)),
('qt',QuantileTransformer(n_quantiles=10))])

pi = xgboost.dask.DaskXGBClassifier(tree_method='hist')
pi.client = client

param_grid ={
    'learning_rate':[0.1, 0.2],
    'n_estimators':[100],
    'reg_lambda': [0.7],
    }

kfold = 5
skf = model_selection.StratifiedKFold(
    n_splits=kfold, shuffle=True,random_state=30
)

clf = GridSearchCV(
        estimator=pi,
        param_grid=param_grid,
        verbose=5,
        cv=skf,
        iid=True,
        return_train_score=True,
        scoring='neg_mean_squared_error',
        refit=False    
    )

pp = post_pipe.fit_transform(DX,Dy)        

clf.fit(da.from_array(pp, chunks=200000), Dy)

I am trying the same on a sample dataset. It runs with worker=1 but with workers > 1, it is still giving the same 'Label set is empty' warning.

trivialfis commented 4 years ago
harshit-2115 commented 4 years ago

How to balance the data between workers ? I get the same warning on the sample dataset too. Can you please debug the sample dataset script and post the changes required to balance the data between workers ?

sandys commented 4 years ago

hi guys - we have the same issue. @trivialfis if you can post a recommended script, it will be great and we will follow the same.

trivialfis commented 4 years ago

Em .. Sometimes its the chunk size, sometimes it's other problems. XGBoost does not move data, it accepts whatever dask provides for each worker. Let me take a look.

trivialfis commented 4 years ago

In the sample script above posted by @harshit-2115 , reducing the chunk size should balance the data enough to prevent starved workers:

from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from sklearn import model_selection

import xgboost

from sklearn.preprocessing import QuantileTransformer
from sklearn.datasets import make_classification

import dask.array as da
from dask.distributed import LocalCluster, Client

if __name__ == '__main__':
    cluster = LocalCluster(n_workers=10, memory_limit='8GB')
    print(cluster.dashboard_link)
    client = Client(cluster)

    X, y = make_classification(n_features=244, n_samples=815414, n_classes=2)

    chunk_size = 1500

    DX = da.from_array(X, chunks=chunk_size)
    Dy = da.from_array(y, chunks=chunk_size)

    post_pipe = Pipeline([('qt', QuantileTransformer(n_quantiles=10))])

    pi = xgboost.dask.DaskXGBClassifier(tree_method='hist')
    pi.client = client

    param_grid = {
        'learning_rate': [0.1, 0.2],
        'n_estimators': [100],
        'reg_lambda': [0.7],
    }

    kfold = 5
    skf = model_selection.StratifiedKFold(n_splits=kfold,
                                          shuffle=True,
                                          random_state=30)

    clf = GridSearchCV(estimator=pi,
                       param_grid=param_grid,
                       verbose=5,
                       cv=skf,
                       iid=True,
                       return_train_score=True,
                       scoring='neg_mean_squared_error',
                       refit=False)

    pp = post_pipe.fit_transform(DX, Dy)

    clf.fit(da.from_array(pp, chunks=chunk_size), Dy)
trivialfis commented 4 years ago

Some simple testing from above script: for using 1500 as chunk size, the data is distributed quite nicely for the first round:

rows  cols
81249 244
63380 244
63623 244
58846 244
62584 244
60052 244
60037 244
65877 244
53922 244
82761 244

But somehow the data is moved in following rounds, will look further.

harshit-2115 commented 4 years ago

Thank you for debugging the script @trivialfis

On a multi-core EC2 machine, which model xgboost(parallelized single-thread fits) or dask xgboost(single-fit distributed across workers) will give us more speed and performance ?

trivialfis commented 4 years ago

Dask xgboost is taking longer than xgboost to train

Not surprising. Even within XGBoost, simply syncing gradients between workers has its overhead. On the dask end there are even more operations. There's always a trade off.

but in case of dask xgboost skl gridsearch gives error when n_jobs > 1

skl functions are designed to work on local data. In our case, it operates on dask.Array/dask.DataFrame, which "looks like" a local data thanks to Python's duck typing. But you can imagine if you slice the data for each thread, you are essentially bypassing the scheduler (for threads, workers and data distribution) in dask. That's probably why the gridsearching in above sample script doesn't work well.

Also, increasing the chunk_size lowers the training time

Again. Trade off. That's something responsible for dask to handle in the future. But https://docs.dask.org/en/latest/understanding-performance.html might help.

Should we consider number of workers and chunk_size as hyperparameters which should be tuned to get the best performance

Accuracy performance? No. Computation performance? Probably. See above link. But I believe there's some low hanging fruits for computation perf. Like switching the backend to something else than pandas. I ( work for NVIDIA) use cudf most of the time. But I believe there are other backends mentioned in dask's document.

On a multi-core EC2 machine, which model xgboost(parallelized single-thread fits) or dask xgboost(single-fit distributed across workers) will give us more speed and performance ?

Hard to say. It really depends on your data. For small data just use normal single node multi-thread training. Your dataset 800MB csv is considered as small. I believe you can train it with normal:

xgboost.train({'tree_method': 'hist'}, ...)

in no time. I use gpu_hist for training on HIGGS (7.9GB dense) on single GPU for 1000 rounds can finish within 100 seconds.

sandys commented 4 years ago

@trivialfis that's a very interesting comment.

Do you always do your training on one BIG machine with GPU or do you do distributed (with many machines) - like the way Sagemaker recommends . So the question becomes - is xgboost distributed training better on 100 machines ...even though there are overheads ?

another question - in general do you use gridsearchcv ever ? how would you do cross-validation with a k-fold ?

trivialfis commented 4 years ago

@sandys Your questions are asking my personal opinion. So the following answers are from personal perspective.

Do you always do your training on one BIG machine with GPU or do you do distributed (with many machines)

Both. I'm a developer.

So the question becomes - is xgboost distributed training better on 100 machines ...even though there are overheads ?

Depends. Is your data 100 machines worthy? For example, one normally don't put iris into a cluster. As commented, it's a trade off. See the performance section in https://medium.com/rapids-ai/a-new-official-dask-api-for-xgboost-e8b10f3d1eb7 . You can see the scaling status of training on HIGGS dataset with gpu_hist and dask. There's a limit in terms of performance scaling, which is effected both by your data (sparsity, shape etc) and your network (nvlink? tcp? ucx?). It's an area we are trying to improve.

in general do you use gridsearchcv ever

Yes. But right now only with single machine. Rapidsai has a notebook for using dask-ml with single node XGBoost if you are interested. Dask is still new here. (and I'm no expert). Like we talked about in #5347 . There are issues we need to address. You can also try the Spark version of XGBoost, which has been here for a very long time (longer than me contributing to XGBoost ..)

trivialfis commented 4 years ago

The questions here are really more about dask instead of XGBoost. I would recommend:

https://docs.dask.org/en/latest/best-practices.html https://docs.dask.org/en/latest/dataframe-best-practices.html

harshit-2115 commented 4 years ago

Your dataset 800MB csv is considered as small. I believe you can train it with normal in no time.

True, single xgboost fit takes no time. But if I am using gridsearchcv for hyperparameter tuning with 5 fold cv, total number of fits increases exponentially. That is why we use multi-core machines to distribute the fits.

harshit-2115 commented 4 years ago

I trained my dataset using dask xgboost, 2000 chunksize with a single gridsearch candidate and 5 Fold cv. It took 28 mins for the first fit to complete, but the rest 4 fits took just 1-2 mins. I didn't get this. What do you think ?

trivialfis commented 4 years ago

multi-core machines to distribute the fits.

XGBoost is multi-threaded even in single node configuration.

it took 28 mins for the first fit to complete, but the rest 4 fits took just 1-2 mins. I didn't get this

Could you take a look into the web interface and see what's dask doing?

Screenshot from 2020-04-02 17-21-04

harshit-2115 commented 4 years ago

Could you take a look into the web interface and see what's dask doing?

So for the first fit, dask took extra time to read the csv into memory. For the subsequent fits, it just did the computation.

XGBoost is multi-threaded even in single node configuration.

Yeah, so do we need dask XGBoost only when we have a cluster of machines ? We can just use the n_jobs param of XGBoost on a single machine to use all cores, right ?

harshit-2115 commented 4 years ago

there is no 'predict_proba' attribute in DaskXgbClassifier, so metrics like roc_auc gives error.

What can we do about this ? @trivialfis

thvasilo commented 4 years ago

Yeah, so do we need dask XGBoost only when we have a cluster of machines ? We can just use the n_jobs param of XGBoost on a single machine to use all cores, right ?

If you need distributed training, then you can use Dask or YARN/Spark which are much more mature.

We can just use the n_jobs param of XGBoost on a single machine to use all cores, right ?

Correct.

harshit-2115 commented 4 years ago

Thank you for clarifying. Can we add 'predict_proba' attribute to dask xgboost ? @trivialfis @thvasilo

trivialfis commented 4 years ago

Yes, will come back to this after 1.1 release.

hcho3 commented 4 years ago

@harshit-2115 Can this issue be closed, now that predict_proba() has been implemented in xgboost.dask?

trivialfis commented 4 years ago

Can this issue be closed, now that predict_proba() has been implemented in xgboost.dask?

I believe so. Feel free to reopen if there's objection.