dask / dask-ml

Scalable Machine Learning with Dask
http://ml.dask.org
BSD 3-Clause "New" or "Revised" License
899 stars 256 forks source link

Lazy fitting #278

Open TomAugspurger opened 6 years ago

TomAugspurger commented 6 years ago

Should we add a compute keyword to all the fit / partial_fit on estimators we implement?

This would aid with

This is somewhat difficult. As @jcrist summarized in https://github.com/dask/dask-searchcv/issues/19

Lazy evaluation conflicts with scikit-learn's api, and is tricky to support here in a way that is both intuitive and robust. For some configurations there is also the need to call get multiple times, which prevents lazy evaluation.

xref https://github.com/dask/dask-ml/pull/259

mrocklin commented 6 years ago

Better scheduling (any examples here?)

Incremental with many parameters. We're prefer to build one large graph that gets a bit of data, passes it around to all of the models, gets the next bit of data, etc.. I think that we (you) did this manually using a compute=False bit of code while at BIDS.

stsievert commented 6 years ago

Better scheduling (any examples here?)

I fit and score serially in https://github.com/dask/dask-examples/pull/15:

inc = Incremental(...)

data = []
for i in range(40):
    inc.fit(X, y)
    data += [{'score': inc.score(X, y), ...}]

There's no reason for this to be serial – it can start training the next iteration while scoring the previous iteration.

mrocklin commented 6 years ago

So it looks like the current fit functions tend to both do some computation, and then also finalize things back onto the collection. This finalization step seems to be what makes this problem hard if we also want to maintian the scikit-learn API.

I wonder if we might use the full Dask collections API here and make use of __dask_finalize__.

with dask.config.set({'ml.lazy': True}):
    inc = Incremental(sgd)
    for i in range(5):
        inc = inc.fit(X, y)
    inc = inc.compute()

cc @jcrist

mrocklin commented 6 years ago

Looks like I meant __dask_postcompute__ rather than __dask_finalize__

TomAugspurger commented 6 years ago

I'll play with that a bit today to see how things feel.

What would you expect the output of Incremental(sgd).fit(X, y, compute=False) to be? A Delayed object or an Incremental (which implements the dask collections API)? I'm leaning towards an Incremental.

TomAugspurger commented 6 years ago

Although, thinking a bit further, when you try to do anything with a "delayed Incremental.fit`, it feels a lot like a dask.Delayed object.

inc = Incremental(sgd)
inc.fit(X, y, compute=False)

inc.coef_  # Delayed?
inc.score(X, y)  # Delayed?
mrocklin commented 6 years ago

I think that it would be another Incremental object. coef_ and score are probably dask delayed objects, but in some cases they might also be dask arrays. I don't think that these strongly affect what we choose to make the result of .fit()

js3711 commented 6 years ago

Probably getting ahead of myself but dask grid search cv will likely also need to be modified to allow for delayed fit methods. This could be desireable for the new bagging estimator

markkoob commented 5 years ago

I would like to build a high level graph which performs several RandomizedSearchCVs across different estimator types in parallel, possibly with different data sets, using a distributed system. What I find is that any attempt to client.submit() a graph containing a delayed .fit() hangs on the worker with no callstack or with this callstack:

File "lib/python3.6/threading.py", line 884, in _bootstrap self._bootstrap_inner()
File "lib/python3.6/threading.py", line 916, in _bootstrap_inner self.run()
File "lib/python3.6/threading.py", line 864, in run self._target(*self._args, **self._kwargs)
File "lib/python3.6/site-packages/distributed/threadpoolexecutor.py", line 55, in _worker task.run()
File "lib/python3.6/site-packages/distributed/_concurrent_futures_thread.py", line 65, in run result = self.fn(*self.args, **self.kwargs)
File "lib/python3.6/site-packages/distributed/worker.py", line 3206, in apply_function result = function(*args, **kwargs)
File "lib/python3.6/site-packages/dask_ml/model_selection/_search.py", line 1251, in fit for batch in ac.batches():
File "lib/python3.6/site-packages/distributed/client.py", line 4251, in batches yield self.next_batch(block=True)
File "lib/python3.6/site-packages/distributed/client.py", line 4223, in next_batch batch = [next(self)]
File "lib/python3.6/site-packages/distributed/client.py", line 4185, in __next__ self.thread_condition.wait(timeout=0.100)
File "lib/python3.6/threading.py", line 299, in wait gotit = waiter.acquire(True, timeout)

Reading this (and other) issues, I think the behavior I see is expected and that this feature would fix it. If I am wrong and this is expected to work I'm happy to produce a concise test example.

TomAugspurger commented 5 years ago

@markkoob that may be a bit surprising. Are you passing dask objects to the delayed fit, or NumPy / pandas objects?

markkoob commented 5 years ago

I'm not certain I've tried fitting with dask.DataFrame types on the distributed client with a delayed fit, but I'm pretty sure I've tried most other combinations. I'll give that one a shot and try building a testcase that shows the behavior on a local cluster object.

markkoob commented 5 years ago

Okay I've been able to reproduce a couple of things with the LocalCluster. Hopefully that means I'm just doing something wrong!

import numpy as np
import pandas as pd

from dask import delayed
import dask.dataframe as dd
from distributed import Client, LocalCluster
from dask_ml.model_selection import RandomizedSearchCV

from sklearn.tree import DecisionTreeClassifier

def start_local_cluster():
    cluster = LocalCluster(n_workers=1)

    client = Client(cluster, asynchronous=True)
    return client, cluster

def load_data(x_size, y_size):
    data = pd.DataFrame(np.random.rand(x_size, y_size))
    return data

def load_labels(x_size, y_size):
    data = pd.DataFrame(np.random.randint(0,2,size=(x_size, y_size)))
    return data

client, cluster = start_local_cluster()

hp_dist = dict(class_weight=['balanced'],
               random_state=[0],
               criterion=['gini', 'entropy'],
               splitter=['best', 'random'],
               max_depth=[5, 10, 15, None],
               max_features=['auto', 'log2', 100, None])

est = DecisionTreeClassifier()

search = RandomizedSearchCV(estimator=est,
                            param_distributions=hp_dist,
                            n_iter=1,
                            cv=3)

# hangs, interrupt trace looks like it's tokenizing a bunch of stuff
# dfx = dd.from_pandas(load_data(1000, 10), npartitions=1)
# dfy = dd.from_pandas(load_labels(1000, 1), npartitions=1)
# task = delayed(search.fit)(dfx, dfy)
# future = client.submit(task)

# assertion failure, assert not is_dask_collection(x)
# dfx = dd.from_pandas(load_data(100, 10), npartitions=1)
# dfy = dd.from_pandas(load_labels(100, 1), npartitions=1)
# search.fit(dfx, dfy)

# Evaluates immediately despite delayed operands
# dfx = delayed(load_data)(1000, 100)
# dfy = delayed(load_labels)(1000, 1)
# search.fit(dfx, dfy)

I'm still trying to figure out how to get the acquire hang to happen predictably, but I'm starting to suspect it might be specific to delayed unpickling which was a shortcut I was taking for testing purposes and not necessarily a part of my use case.

MatthewLennie commented 3 years ago

Hey all,

IMO, lazy fit will be a fantastic addition. I had some real pain using the StandardScaler with the column transformer because it called compute for each column. I have built here a rough prototype that implements scalers as a custom collection which was hinted at by @mrocklin and perhaps an approach like this may be the future. Would love to hear your thoughts. I think it would also have the advantage the ColumnsTranformers could be built on top of this in the same way thus using the HighLevelGraph from each of the scalers.


from dask import array, dataframe
import dask

from dask.base import DaskMethodsMixin, replace_name_in_key, is_dask_collection
from dask.optimization import cull

class StandardScaler(DaskMethodsMixin):
    def __init__(self, mean=None, `std=None):`
        self._mean = mean
        self._std = std
    @property
    def mean(self):
        if self._mean is not None:
            return self._mean
        raise ValueError("Mean was none - Have you fit?")
    @mean.setter
    def mean(self, input):
        self._mean = input

    @property 
    def std(self):
        if self._mean is not None:
            return self._std
        raise ValueError("Std was none - Have you fit?")

    @std.setter
    def std(self,input):
        self._std = input

    def fit(self, data):
        self.mean = data.mean()
        self.std = data.std()
        return self

    def transform(self, data):
        return (data-self.mean)/self.std

    def inverse_transform(self, data):
        return data*self.std + self.mean 

    @property 
    def _graph(self):
        # Only compute in the case that both the mean and std are dask collections 
        if is_dask_collection(self.mean) and is_dask_collection(self.std):
            return dask.highlevelgraph.HighLevelGraph.merge(self.mean.__dask_graph__(),self.std.__dask_graph__())

    def __dask_graph__(self):
        return self._graph

    def __dask_keys__(self):
        if self._graph:
            return self.mean.__dask_keys__() + self.std.__dask_keys__()
        else:
            return []

    # Use the threaded scheduler by default.
    __dask_scheduler__ = staticmethod(dask.threaded.get)

    def __dask_postcompute__(self):
        # The post compute rebuilds a StandardScaler with evaluated versions 
        # of the parameters
        return StandardScaler._rebuild, ()

    @staticmethod
    def _rebuild(*args):
        return Tuple(*args[0])

    def __repr__(self):
        return repr(f"Mean - {self.mean}, Std - {self.std}")
da = array.from_array([1,2,3])
b = StandardScaler().fit(da)
i = b.transform(da)
i = i.compute()
b = b.compute()
MatthewLennie commented 3 years ago

Further to my previous comment.

A rough idea of a Column transformer implementation could be done thusly. This would just work with data in the form of dataframes, but that can obviously be extended easily enough. Also scalers can be turned into a property etc.. this is just a quick and dirty prototype, but I think a good start for discussion.

class ColumnTransformer(DaskMethodsMixin):
    def __init__(self, scalers=None):

        self._scalers = scalers
        self.non_dask_scalers = {}
        self.dask_scalers = {}

    def fit(self, data):
        for key,series in data.items():
            self._scalers[key] = self._scalers[key].fit(series)
        return self

    def transform(self, data):
        for key, series in data.items():
            data[key] = scaler_dicts[key].transform(series) 
        return data

    def _graph(self):
        # Only compute in the case that both the mean and std are dask collections 
        self.dask_scalers = {}
        self.non_dask_scalers = {}
        for key, scaler in self._scalers.items():
            if is_dask_collection(scaler):
                self.dask_scalers[key]= scaler
            else:
                self.non_dask_scalers[key] = scaler

    def __dask_graph__(self):
        self._graph()
        graphs = [x.__dask_graph__() for x in self.dask_scalers.values()]
        return dask.highlevelgraph.HighLevelGraph.merge(*graphs)

    def __dask_keys__(self):
        self._graph()
        keys = [x.__dask_keys__() for x in self.dask_scalers.values()]
        return keys

    # Use the threaded scheduler by default.
    __dask_scheduler__ = staticmethod(dask.threaded.get)

    def __dask_postcompute__(self):
        # The post compute rebuilds a StandardScaler with evaluated versions 
        # of the parameters
        return ColumnTransformer._rebuild, ((self.dask_scalers),(self.non_dask_scalers))

    @staticmethod
    def _rebuild(*args):
        scalers = args[1]
        non_dask_scalers = args[2]
        for computed_scaler,(key, scaler) in zip(args[0],scalers.items()):
            scalers[key] = scaler._rebuild(computed_scaler)
        scalers = {**scalers, **args[2]}
        return ColumnTransformer(scalers)

    def __repr__(self):
        return repr(self._scalers)