EpistasisLab / tpot

A Python Automated Machine Learning tool that optimizes machine learning pipelines using genetic programming.
http://epistasislab.github.io/tpot/
GNU Lesser General Public License v3.0
9.58k stars 1.55k forks source link

Parallelization with python dask and dask-learn. Proposal. #304

Open ghgr opened 7 years ago

ghgr commented 7 years ago

According to Issue https://github.com/rhiever/tpot/issues/177 I would like to propose work on the distributed/cluster environment. I believe that would be a major advancement on the applicability of tpot to real world problems. At this moment I am working with a dataset roughly (200000 , 500) and it smashes one core while the rest sit idle.

There has been discussion on python-dask [1], which is for the unfamiliar something like TensorFlow. You symbolically build a graph of the operations and then you call the compute method. This computation is automatically distributed, and intermediate results cached. I'd recommend taking a look at the official documentation (it took me 20 minutes on a train ride, so no big deal) to see some examples [2-5].

The first step I would like to propose is to model a single individual (sklearn pipeline) as a graph. Some work has been carried out in the Dask-learn project [6]. This project builds a graph for a single Pipeline.

Next step would be to model the entire population for each generation as a graph. As far as I know there is no work in this direction, but it should not be particularly complex to generalize. Since many individuals share many components, Dask will automatically execute them once and reuse the results for the others, dramatically increasing performance. At the same time it distributes computations between many cores in a single or multiple mode architecture.

At this point the whole generation will be calculated at the same time.

The advantages are clear, mainly (a) distributed computation and (b) caching of intermediate results. As disadvantages I see:

I'd love to hear your comments/considerations. As said above I believe this kind of unlimited scalability is worth the effort and a large part of the work has been already done by the dask-learn project. In addition, development of this new feature can be orthogonal to the development of core features.

Regards,

Eduardo

[1] http://dask.pydata.org/en/latest/ [2] https://github.com/dask/dask-tutorial/blob/master/01-Array.ipynb [3] https://github.com/dask/dask-tutorial/blob/master/02-Foundations.ipynb [4] https://github.com/dask/dask-tutorial/blob/master/03a-DataFrame.ipynb [5] https://github.com/dask/dask-tutorial/blob/master/04-Imperative.ipynb [6] http://blaze.pydata.org/blog/2015/10/19/dask-learn/

danthedaniel commented 7 years ago

With dask, is there any way to prevent constant duplication of a dataset in memory? As it is, it's fairly easy to parallelize TPOT with the python multiprocessing module, but that requires cloning the entire environment with pickle through some IPC. This is a problem for large data-sets, as the memory copying takes up a lot of time, and in the end you need a shit-ton of RAM.

ghgr commented 7 years ago

AFAIK with dask you build the computation graph, and then you execute it. The memory allocation, redundancy elimination and job distribution (in case of a cluster) is entirely Dask's problem.

QuantumDamage commented 6 years ago

The more I'm watching Dask videos (for example https://www.youtube.com/watch?v=RA_2qdipVng) i believe that this could make much sense. Was there any approaches to test tpot with dask?

QuantumDamage commented 6 years ago

There seems to be some approaches to accelerate scikit-learn algorithms with dask: https://dask-ml.readthedocs.io/en/latest/

TomAugspurger commented 6 years ago

With dask, is there any way to prevent constant duplication of a dataset in memory?

Dask lets you swap out the scheduler (threaded, multiprocessing, distributed) easily. The threaded scheduler will avoid the need to clone the datasets multiple times, but for the best performance the algorithms should release the GIL.

I spent a bit of time a few weeks ago looking into this, but I'm not too familiar with tpot. The joblib-based parallelization added in the dev branch seems to do pretty well for coarse-grained parallelism like "fit these 8 models in parallel".

If you're able to build the entire graph ahead of time, and if there's redundant computations in multiple branch, dask will be able to avoid those redundant computations.

If anyone more familiar with tpot is interested in prototyping something, I'd be happy to support from the dask side. Otherwise, I'll try to take another look in a few weeks.

QuantumDamage commented 6 years ago

@TomAugspurger Maybe if there is a way to predict memory allocation we could swap scheduler to better fit on available resources? Kind of heuristics which will take available cpus, threads, ram and swap as parameters?

TomAugspurger commented 6 years ago

In general, dask leaves scheduler choice up to the user. It provides good defaults dask collections (multiprocessing for dask.bag, threaded for dask.array and dask.dataframe), and makes it easy to switch between schedulers.

On Fri, Oct 27, 2017 at 8:33 AM, QuantumDamage notifications@github.com wrote:

@TomAugspurger https://github.com/tomaugspurger Maybe if there is a way to predict memory allocation we could swap scheduler to better fit on available resources? Kind of heuristics which will take available cpus, threads, ram and swap as parameters?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/rhiever/tpot/issues/304#issuecomment-339972505, or mute the thread https://github.com/notifications/unsubscribe-auth/ABQHIofrrPQDXzA4136-FiamHElToo9Cks5swdukgaJpZM4KqMt4 .

mrocklin commented 6 years ago

With dask, is there any way to prevent constant duplication of a dataset in memory? As it is, it's fairly easy to parallelize TPOT with the python multiprocessing module, but that requires cloning the entire environment with pickle through some IPC. This is a problem for large data-sets, as the memory copying takes up a lot of time, and in the end you need a shit-ton of RAM.

I suspect that Dask's memory management heuristics will handle this without significant issue.

Intermediate states of the graph are not cached between generations. The solution for this is trickier. Maybe it is possible to build a graph for all generations at the same time, but that would imply to reimplement DEAP as a dask graph and just the idea makes my head spin.

Note that with the newer scheduler you can evolve a computation on the fly, so you don't need to decide everything ahead of time. The right API to look at for this is probably the concurrent.futures API. You can submit many tasks, wait for a few of them to come back, and then based on those results submit more that are more likely to be helpful. Here is a talk at the last SciPy conference that talks about some of the more real-time features of the dask schedulers: https://www.youtube.com/watch?v=ZxNPVTHQwGo

mrocklin commented 6 years ago

FWIW I'm enthusiastic about this pairing. From my perspective as a Dask developer TPOT's workload is a nice example of something that is clearly parallelizable, but requires more sophistication than your typical big data system (or at least, this is based on a guess of what I think your workloads look like). This plays nicely to Dask's strengths.

I suspect that the efficient parallelism would provide some convenient speedups (especially if you're currently spending time repeatedly shipping data off with the multiprocessing module), but that you might actually find more value in the visual diagnostics, parallel profiling, etc. that come along for free.

westurner commented 6 years ago

Would these memory mapping and zero-copy approaches help with parallelism here? https://arrow.apache.org/docs/python/memory.html

https://github.com/maartenbreddels/vaex

mrocklin commented 5 years ago

Would these memory mapping and zero-copy approaches help with parallelism here?

Generally for workloads like this my recommendations would be to stay within a single process per node if possible, so zero-copy isn't really a concern. This would differ if you're handling mostly text data. In that case serialization will kill you anyway. Generally speaking my guess is that very few workloads of this type are at the point where zero-copy is something they should worry about.

mrocklin commented 5 years ago

Is there an obvious central place within TPOT from where most computation is planned that would make sense for someone familiar with dask to look at?

rhiever commented 5 years ago

TPOT pipeline parallelization is primarily done here, where we're currently using scikit-learn's port of joblib.

Some additional considerations:

mrocklin commented 5 years ago

The first thing to try is probably just using the joblib.parallel_backend('dask') solution and see how that performs on a distributed system. My guess is that we'll eventually want to go further in order to avoid recomputation of shared results and such, but it would be good to have a baseline.

Is there a standard workflow to try this out on?

This might be a good blogpost around the dask-joblib integration being useful for things outside of just Scikit-Learn.

Also cc @stsievert who might find this conversation interesting.

rhiever commented 5 years ago

Sure, here's a basic TPOT workflow:

from tpot import TPOTClassifier
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split

digits = load_digits()
X_train, X_test, y_train, y_test = train_test_split(digits.data, digits.target,
                                                    train_size=0.75, test_size=0.25)

tpot = TPOTClassifier(generations=100, population_size=100, # TPOT will evaluate 100x100 pipelines
                      verbosity=2, # TPOT will show a progress bar during optimization
                      n_jobs=-1, # Enable multiprocessing
)
tpot.fit(X_train, y_train)
print(tpot.score(X_test, y_test))

More examples available in the docs here.

westurner commented 5 years ago

Would these memory mapping and zero-copy approaches help with parallelism here?

.

Generally for workloads like this my recommendations would be to stay within a single process per node if possible, so zero-copy isn't really a concern. This would differ if you're handling mostly text data. In that case serialization will kill you anyway. Generally speaking my guess is that very few workloads of this type are at the point where zero-copy is something they should worry about.

Because none of these parallelizable algorithms yet require synchronization? https://en.wikipedia.org/wiki/Bulk_synchronous_parallel

Thanks for the links to the docs.

stsievert commented 5 years ago

My guess is that we'll eventually want to go further in order to avoid recomputation of shared results

Related: dask-searchcv has some caching to avoid repeated tuning for sections of pipelines: https://dask-ml.readthedocs.io/en/latest/hyper-parameter-search.html#avoid-repeated-work. This would be most useful when some of the first elements in a pipeline take a long time and have a couple parameters to tune (e.g., text feature extraction).

mrocklin commented 5 years ago

It looks like we might want to also dive into the _wrapped_cross_val_score function and wrap around the _fit_and_score function

def _wrapped_cross_val_score(sklearn_pipeline, features, target,
                             cv, scoring_function, sample_weight=None, groups=None):
    """Fit estimator and compute scores for a given dataset split.
    Parameters
    ----------
    sklearn_pipeline : pipeline object implementing 'fit'
        The object to use to fit the data.
    features : array-like of shape at least 2D
        The data to fit.
    target : array-like, optional, default: None
        The target variable to try to predict in the case of
        supervised learning.
    cv: int or cross-validation generator
        If CV is a number, then it is the number of folds to evaluate each
        pipeline over in k-fold cross-validation during the TPOT optimization
         process. If it is an object then it is an object to be used as a
         cross-validation generator.
    scoring_function : callable
        A scorer callable object / function with signature
        ``scorer(estimator, X, y)``.
    sample_weight : array-like, optional
        List of sample weights to balance (or un-balanace) the dataset target as needed
    groups: array-like {n_samples, }, optional
        Group labels for the samples used while splitting the dataset into train/test set
    """
    sample_weight_dict = set_sample_weight(sklearn_pipeline.steps, sample_weight)

    features, target, groups = indexable(features, target, groups)

    cv = check_cv(cv, target, classifier=is_classifier(sklearn_pipeline))
    cv_iter = list(cv.split(features, target, groups))
    scorer = check_scoring(sklearn_pipeline, scoring=scoring_function)

    try:
        with warnings.catch_warnings():
            warnings.simplefilter('ignore')
            scores = [_fit_and_score(estimator=clone(sklearn_pipeline),
                                    X=features,
                                    y=target,
                                    scorer=scorer,
                                    train=train,
                                    test=test,
                                    verbose=0,
                                    parameters=None,
                                    fit_params=sample_weight_dict)
                                for train, test in cv_iter]
            CV_score = np.array(scores)[:, 0]
            return np.nanmean(CV_score)
rhiever commented 5 years ago

That would be nice - would allow for simultaneous parallelization of the CV evaluations.

karan10111 commented 5 years ago

For people who already have spark setup. We have a first cut out of parallelising TPOT (+ DEAP) with spark, for quite some time now, in our private fork. It's (alpha) tested and fairly profiled in terms of memory.

We are using parallel delayed to call fit_and_score. And tested it with sklearn's transformer caching to remove redundant computations and have seen impressive results (though it takes huge amount of disk if number of individuals and generations are high.).

We might think of ways to send similar tasks to executors such that transformers caching can be more effective.

gp_deap.py checkout the _wrapped_cross_val_score_spark method, line 458.

We'll open separate PR soon. Suggestions will be highly appreciated. Thanks.

PS - We had to make some changes to DEAP. DEAP#268. These were serialisation related changes.

westurner commented 5 years ago

Pandas on Ray may have some helpful performance optimizations that may be helpful for spark as well:

See also:

westurner commented 5 years ago
TomAugspurger commented 5 years ago

For those following this issue, https://github.com/EpistasisLab/tpot/pull/730 has been merged into development. If you're interested you could try that out with tpot dev and dask-ml >= 0.9.0.

730 solved the relatively easy task of training many individuals in parallel (on a cluster). It did not address some of the points in the original issue like some individuals in a generation being relatively slow, or caching between generations, or parallelizing the crossover and mutation stage. If anyone is interested in working on those I could assist, but I don't have plans to work on it right now.

ballcap231 commented 4 years ago

Just to be clear, if I specify "use_dask=True" for TPOT API do I still need to specify "memory='auto' " if I want caching between transformers? That is, does using dask with TPOT ensure models with the same configurations aren't recomputed?