dask / dask-ml

Scalable Machine Learning with Dask
http://ml.dask.org
BSD 3-Clause "New" or "Revised" License
893 stars 255 forks source link

AttributeError when using dask_ml.model_selection.kfold object #849

Open makquel opened 3 years ago

makquel commented 3 years ago

What happened: I'm currently trying to create a pipeline for model training using LogisticRegression and Nested cross-validation. I've got an unexpected AttributeError during the pipeline execution.

Exception: AttributeError("'numpy.ndarray' object has no attribute 'chunks'")

What you expected to happen: I wasn't expecting that since, I double-checked that all the objects are dask.array. The following MWE shows what my pipeline looks like.

Minimal Complete Verifiable Example:

from typing import Tuple, Any
from dask.array.core import Array
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from dask_ml.model_selection import GridSearchCV, KFold
from dask_ml.linear_model import LogisticRegression
from dask_ml.metrics import accuracy_score
from sklearn.metrics import roc_auc_score
from sklearn.base import is_classifier
from dask.distributed import Client, progress
import dask.array as da

import numpy as np
import joblib
from dask_ml.datasets import make_classification as dask_make_classification

# import warnings filter
from warnings import simplefilter

# ignore all future warnings
simplefilter(action="ignore", category=FutureWarning)

def fake_dataset() -> Tuple[Array, Array]:
    X, y = dask_make_classification(
        n_samples=1000,
        n_features=20,
        random_state=1,
        n_informative=10,
        n_redundant=10,
        chunks=1000 // 20,
    )
    return X, y

def train_model(X: Array, y: Array) -> None:
    n_outer_splits = 2
    n_inner_splits = 2
    param_grid = [
        {
            "classifier": [LogisticRegression()],
            "classifier__penalty": ["l1", "l2"],
            "classifier__C": np.logspace(-4, 4, 20),
            "classifier__solver": ["liblinear"],
        },
    ]
    # define the model
    pipeline = Pipeline([("classifier", LogisticRegression())])
    # XXX: check that is a proper model
    try:
        if not is_classifier(pipeline["classifier"]):
            raise Exception("Not valid classification algorithm")
    except Exception as e:
        print(f"Be aware of: {e}")
    finally:
        pass
    # set-up the nested cross-validation procedure
    cv_outer = KFold(n_splits=n_outer_splits, shuffle=True, random_state=1)
    # enumerate splits
    outer_results = list()
    for kth_fold, (train_ix, test_ix) in enumerate(cv_outer.split(X)):
        print(f"Running {kth_fold} Fold")
        # split data
        X_train, X_test = X[train_ix, :], X[test_ix, :]
        y_train, y_test = y[train_ix], y[test_ix]
        # setup inner cross-validation procedure
        cv_inner = KFold(n_splits=n_inner_splits, shuffle=True, random_state=1)

        # define search
        search = GridSearchCV(
            estimator=pipeline,
            param_grid=param_grid,
            scoring="accuracy",
            cv=cv_inner,
            refit=True,
        )
        with joblib.parallel_backend("dask"):
            result = search.fit(X_train, y_train)

    return None

if __name__ == "__main__":
    client = Client(
        processes=False, threads_per_worker=1, n_workers=4, memory_limit="10GB"
    )

    X, y = fake_dataset()
    train_model(X, y)

Anything else we need to know?: Further debugging showed that the error comes from fit operation, however, there are not atrbiutes using np.ndarray objects.

Environment:

TomAugspurger commented 3 years ago

Can you post the full traceback?

makquel commented 3 years ago

Sure thing @TomAugspurger

Running 0 Fold
/home/makquel/anaconda3/envs/model_professor_2/lib/python3.8/site-packages/dask/array/slicing.py:1080: PerformanceWarning: Increasing number of chunks by factor of 14
  p = blockwise(
distributed.worker - WARNING - Compute Failed
Function:  cv_split
args:      (KFold(n_splits=2, random_state=1, shuffle=True), array([[ 0.19495857, -0.8850293 , -0.40491901, ...,  1.33906018,
        -1.25807389,  0.04787755],
       [ 0.25033412,  0.62342772,  0.43649497, ..., -0.18664019,
         2.01090793, -0.33272972],
       [ 0.07370212, -0.85365332,  1.06964283, ..., -0.30508485,
        -1.17534124,  1.20492622],
       ...,
       [ 1.38853841,  1.56361377,  0.34933573, ..., -0.69068144,
        -1.10318088,  0.26826262],
       [ 0.7677924 ,  0.19290652,  1.48708061, ...,  0.53962508,
        -1.22606228,  0.11060196],
       [-0.65174484,  1.41971851, -0.00285028, ..., -1.24113283,
        -1.30192431, -1.98192271]]), array([0, 0, 0, 0, 0, 1, 1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 1,
       0, 0, 1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0,
       0, 1, 0, 1, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0,
       0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0, 1, 0,
       0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1
kwargs:    {}
Exception: AttributeError("'numpy.ndarray' object has no attribute 'chunks'")

('score-441cbc92926e2852d676f4d612a0c376', 1, 1) has failed... retrying
distributed.worker - WARNING - Compute Failed
Function:  cv_split
args:      (KFold(n_splits=2, random_state=1, shuffle=True), array([[ 0.19495857, -0.8850293 , -0.40491901, ...,  1.33906018,
        -1.25807389,  0.04787755],
       [ 0.25033412,  0.62342772,  0.43649497, ..., -0.18664019,
         2.01090793, -0.33272972],
       [ 0.07370212, -0.85365332,  1.06964283, ..., -0.30508485,
        -1.17534124,  1.20492622],
       ...,
       [ 1.38853841,  1.56361377,  0.34933573, ..., -0.69068144,
        -1.10318088,  0.26826262],
       [ 0.7677924 ,  0.19290652,  1.48708061, ...,  0.53962508,
        -1.22606228,  0.11060196],
       [-0.65174484,  1.41971851, -0.00285028, ..., -1.24113283,
        -1.30192431, -1.98192271]]), array([0, 0, 0, 0, 0, 1, 1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 1,
       0, 0, 1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0,
       0, 1, 0, 1, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0,
       0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0, 1, 0,
       0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1
kwargs:    {}
Exception: AttributeError("'numpy.ndarray' object has no attribute 'chunks'")

Would be that enough to diagnose?

TomAugspurger commented 3 years ago

Thanks. You might want to try using the single-threaded scheduler (at least not using the distributed schedule) which should give cleaner tracebacks.

I’m trying to narrow down where things are converted to numpy arrays.

On Aug 19, 2021, at 8:45 AM, Miguel Ángel Cárdenas @.***> wrote:

 Sure thing @TomAugspurger

Running 0 Fold /home/makquel/anaconda3/envs/model_professor_2/lib/python3.8/site-packages/dask/array/slicing.py:1080: PerformanceWarning: Increasing number of chunks by factor of 14 p = blockwise( distributed.worker - WARNING - Compute Failed Function: cv_split args: (KFold(n_splits=2, random_state=1, shuffle=True), array([[ 0.19495857, -0.8850293 , -0.40491901, ..., 1.33906018, -1.25807389, 0.04787755], [ 0.25033412, 0.62342772, 0.43649497, ..., -0.18664019, 2.01090793, -0.33272972], [ 0.07370212, -0.85365332, 1.06964283, ..., -0.30508485, -1.17534124, 1.20492622], ..., [ 1.38853841, 1.56361377, 0.34933573, ..., -0.69068144, -1.10318088, 0.26826262], [ 0.7677924 , 0.19290652, 1.48708061, ..., 0.53962508, -1.22606228, 0.11060196], [-0.65174484, 1.41971851, -0.00285028, ..., -1.24113283, -1.30192431, -1.98192271]]), array([0, 0, 0, 0, 0, 1, 1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 1, 0, 0, 1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1 kwargs: {} Exception: AttributeError("'numpy.ndarray' object has no attribute 'chunks'")

('score-441cbc92926e2852d676f4d612a0c376', 1, 1) has failed... retrying distributed.worker - WARNING - Compute Failed Function: cv_split args: (KFold(n_splits=2, random_state=1, shuffle=True), array([[ 0.19495857, -0.8850293 , -0.40491901, ..., 1.33906018, -1.25807389, 0.04787755], [ 0.25033412, 0.62342772, 0.43649497, ..., -0.18664019, 2.01090793, -0.33272972], [ 0.07370212, -0.85365332, 1.06964283, ..., -0.30508485, -1.17534124, 1.20492622], ..., [ 1.38853841, 1.56361377, 0.34933573, ..., -0.69068144, -1.10318088, 0.26826262], [ 0.7677924 , 0.19290652, 1.48708061, ..., 0.53962508, -1.22606228, 0.11060196], [-0.65174484, 1.41971851, -0.00285028, ..., -1.24113283, -1.30192431, -1.98192271]]), array([0, 0, 0, 0, 0, 1, 1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 1, 0, 0, 1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1 kwargs: {} Exception: AttributeError("'numpy.ndarray' object has no attribute 'chunks'") Would be that enough to diagnose?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or unsubscribe.

ChadGueli commented 2 years ago

Hi everyone,

@TomAugspurger, I think that I have found the implicit NumPy conversion. The following script represents the functionality affecting X and y in the build_cv_graph function when called by the fit method of the DaskBaseSearchCV. Running this script makes clear that the to_keys function is the problem. Notably, this script will not throw an error because the actual splitting is not carried in this part of the code.

from sklearn.model_selection._split import KFold
from dask_ml.model_selection._search import compute_n_splits
from dask_ml.model_selection.utils import to_indexable, to_keys
from dask_ml.model_selection.methods import cv_split
import dask.array as da
from dask import get

def printtype(x, i):
    print(f"{i}, {type(x)}")
    return x

X = da.random.normal(size=(1_000_000, 5),
                     chunks=(100_000, None))
y = da.random.normal(size=1_000_000, chunks=100_000)
X, y, groups = to_indexable(X, y, None)
cv = KFold(3)

dsk = {}
dsk["check0"] = (printtype, X, 0)
X_name, y_name, groups_name = to_keys(dsk, X, y, groups)

dsk["check1"] = (printtype, X_name, 1)
dsk["cv_name"] = (cv_split, cv, "check1", y_name, groups_name, False, True)
get(dsk, "cv_name")

Looking inside dask_ml.model_selection.utils.to_keys, we see that X and y are acted on by

for x in args: # i.e. [X, y,...]
    ...
    elif isinstance(x, (da.Array, dd.DataFrame)):
        x = delayed(x)
        dsk.update(x.dask)
        yield x.key

The issue is eliminated when the x = delayed(x) transform is removed, and yield x is used. This is exciting because the following mwe works with the change.

import dask.array as da
import dask.distributed
import dask_ml.model_selection as dcv
import xgboost as xgb

if __name__=="__main__":
    with dask.distributed.Client() as client:
        X = da.random.normal(size=(1_000_000, 5),
                             chunks=(100_000, None))
        y = da.random.normal(size=1_000_000, chunks=100_000)
        estim = xgb.dask.DaskXGBRegressor()
        params = {'max_depth': [3, 4, 5]}

        model = dcv.RandomizedSearchCV(estim, params, n_iter=1)
        model.fit(X, y)

As such, making this change could close this issue and issue #906. If possible, I would greatly appreciate the chance to push this change. Of course, I have only run it on this example and a very similar example with Dask's LinearRegression estimator, so other issues could emerge. However, on my understanding, calling delayed is unnecessary.

@trivialfis, if there are any particular tests that need to be run to verify the interoperability of Dask with this change and xgboost.dask, please let me know.

trivialfis commented 2 years ago

Unfortunately, the dask-ml package is not yet compatible with xgboost.dask so there's no test at the moment.

ChadGueli commented 2 years ago

Hi everyone,

I have run the pytests. Some initially failed because dask.Array instances are mutable unlike dask.Delayed but this was easily fixed by calling id. However, I am unable to fix one of the failures.

The test_grid_search_dask_dataframe function repeatedly fails because "DataFrame.iloc only supports selecting columns. It must be used like df.iloc[:, column_indexer]." The issue arises when the _pandas_indexing function attempts to split the data frame along its rows into train and test parts.

In the past, this issue likely went undiscovered because much like the implicit Dask to NumPy conversion leading to the initial issue, there is an implicit Dask to Pandas conversion. As such, if test_grid_search_dask_dataframe is run with a DataFrame whose total size exceeds the worker memory limit, then a loop of warnings and failures will occur.

@TomAugspurger, given your advanced experience with Dask, I think it would be better for you to decide how to handle this data frame issue. If you would like I can push the changes that I have made so you can look at them.