dask / dask-searchcv

dask-searchcv is now part of dask-ml: https://github.com/dask/dask-ml
BSD 3-Clause "New" or "Revised" License
240 stars 43 forks source link

ENH: Hyperband implementation #72

Closed stsievert closed 6 years ago

stsievert commented 6 years ago

This PR addresses https://github.com/dask/dask-ml/issues/161 with the implementation of the adaptive model selection algorithm Hyperband. I describe the formulation of Hyperband in https://github.com/dask/dask-ml/issues/161#issuecomment-383150203. Briefly, Hyperband treats hyperparameter optimization as a resource allocation problem: why spend time on poorly performing models if you want to find the best performing model? Part of Hyperband trains models for a few iterations then "kills" the worst performing models, and then loops again in this "train-kill" cycle until 1 model is left.

This algorithm is well-suited for Dask, as it is embarrassingly parallel. The main function behind Hyperband, successive_halving is run a couple times in a embarrassingly parallel for-loop, and each call to successive_halving calls partial_fit on many models at once in the "train-kill" cycle.

The main feature of Hyperband is that it only requires one input, the maximum number of times partial_fit is called on any one classifier. This value is (approximately) proportional to the time spent waiting for Hyperband to complete. With this computational budget that the user specifies, Hyperband has theory that says it will find the best model possible given this computational budget*.

This simplifies the user-facing API. In Hyperband, the user can specify (approximately) the time required for the search. This is an advantage over grid or random search, where the user would have to find some balance between "evaluating many parameters for few calls" or "evaluating few parameters for many calls". This is mentioned in the scikit-learn RandomizedSearchCV documentation:

n_iter : int, default=10. Number of parameter settings that are sampled. n_iter trades off runtime vs quality of the solution.

This tradeoff is not present in Hyperband.

This is important for models that require lots of computation. The implementation of this algorithm would have saved me a couple months of research effort, but I couldn't use sklearn. As such, I have tested it to make sure it works with a simple class that only requires 4 functions, {get, set}_params, partial_fit and score.

I opened this PR instead of continuing with #71 to give a better explanation of Hyperband, and because the user API is now usable, and tested.

TODO:

All of the TODOs come for free if GridSearchCV is used which essentially all I'm doing (evaluating a specific parameter is a grid with each list having length one). However this will require another modification, which boils down to defining a partial_fit function in methods.py and making sure it's called from the right place.

I do not think multimetric scoring is relevant here because this is an adaptive algorithm that has to decide quantitatively which model is better. We should leave it up to the user to define their model.score to determine which model is better.

* More precisely, with high probability the expected loss achieved by the produced model will be "close" to the best loss that could have been found with this computational budget. "close" means "within log factors"

stsievert commented 6 years ago

Thanks for the review, @TomAugspurger. It was useful. I've updated most of the comments, but am still debugging.

Here's a gist of my example notebook: https://gist.github.com/stsievert/38780f0c746b2f1c8eb32feea4dd14de, with a simple test function.

Right now I'm facing some memory issues, and issues with n_jobs == -1. I need to resolve these. I'll probably do this with joblib and some nested parallelism, which will require https://github.com/dask/distributed/pull/1705.

TomAugspurger commented 6 years ago

👍 I'll take a look tonight or tomorrow. I've lost my day to debugging strange distributed errors that I think are my fault :)

FYI, LMK if you need help getting a functional version of scikit-learn master + joblib master. It takes a bit of work. Or I can scale up the cluster again.

stsievert commented 6 years ago

Thanks @TomAugspurger! And no rush, I'm taking tomorrow and Thursday off, and have other things on my plate.

stsievert commented 6 years ago

This is ready for review no @TomAugspurger Some notes:

*: tested with pytest -k "{partial-function-name}"

TomAugspurger commented 6 years ago

Will take a quick pass now.

I would like to log inside a function called with client.submit

Did you write the function that's being submitted? If so, can you write the logging inside the function, so that it logs when running?

TomAugspurger commented 6 years ago

FYI, there are also some linting errors (flake8 dask_searchcv). https://travis-ci.org/dask/dask-searchcv/jobs/390179107#L808

Most text editors will have some kind of plugin for running flake8 each time you save the file, if you want to catch these earlier.

TomAugspurger commented 6 years ago

This depends on dask-ml. As is I think we currently see dask-searchcv as an upstream dependency of dask-ml, and also somewhat stale. Would it make sense to move this there?

Opened https://github.com/dask/dask-searchcv/issues/73 for that.

I think this should be merged before we do any transfer though.

stsievert commented 6 years ago

This depends on dask-ml. As is I think we currently see dask-searchcv as an upstream dependency of dask-ml, and also somewhat stale. Would it make sense to move this there?

The only functionality I use from dask-ml is train_test_split. I'm planning on removing the dependency by integrating with GridSearchCV. There are two other imports from dask-ml, but they're not really needed (one is spurious, one is only for an isinstance).

stsievert commented 6 years ago

There's a lot happening on this with an upstream dependency, scikit-learn. Here's what I think the plan should be:

  1. Merge this in (after cleaning up and testing – give me the rest of today)
  2. Implement an extension of Hyperband that is well-suited for Dask (which well depend on dask.distributed, and is not suitable for sklearn)
  3. Implement Hyperband in sklearn (which will certainly depend on https://github.com/scikit-learn/scikit-learn/pull/11266)
  4. Change dask-searchcv version of Hyperband to be sklearn's version (or at least use their cross_validate function)

This will provide a basic implementation of Hyperband now, and provide an extension well suited for Dask. The only difference with the basic version of Hyperband is that is does very simple cross validation – is that sufficient for dask-searchcv now? Future work will use sklearn's cross_validate in Hyperband and the extension once https://github.com/scikit-learn/scikit-learn/pull/11266 is merged.

@mrocklin @TomAugspurger any thoughts or edits to make?

stsievert commented 6 years ago

I'm satisfied with the implementation now. This is no longer a WIP and ready for a final review.

I have one test that is failing, but it's also failing on master.

mrocklin commented 6 years ago

n_jobs is a joblib term, not a dask term. Generally users don't specify this when building their computations with Dask, they specify it separately. If n==1 we might choose to use the single-threaded scheduler (see http://dask.pydata.org/en/latest/scheduling.html) otherwise if n not in (-1, 1) we might just err?

On Mon, Jun 18, 2018 at 11:27 AM, Scott Sievert notifications@github.com wrote:

@stsievert commented on this pull request.

In dask_searchcv/adaptive.py https://github.com/dask/dask-searchcv/pull/72#discussion_r196121530:

  • for s in reversed(range(s_max + 1)):
  • n = math.ceil((B / R) * eta**s / (s + 1))
  • r = R * eta ** -s
  • all_kwargs += [{'s': s, 'n': n, 'r': r, 'dry_run': dry_run,
  • 'eta': eta, '_prefix': f's={s}', 'shared': shared,
  • 'scorer': self.scorer_}]
  • if self.n_jobs == -1:
  • futures = [dask.delayed(_successive_halving)(self.params, self.model,
  • data, kwargs, fit_kwargs)
  • for kwargs in all_kwargs]
  • results = [f.compute() for f in futures]
  • else:
  • results = [_successive_halving(self.params, self.model, data,
  • kwargs, fit_kwargs)
  • for kwargs in all_kwargs]

Is there a way to specify n_jobs when using dask.delayed?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask-searchcv/pull/72#discussion_r196121530, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszNbnEnbpl3UCnLMlIdI9D1Txo3qNks5t98b_gaJpZM4UYPL1 .

stsievert commented 6 years ago

dask-searchcv does use n_jobs, which is why I included a basic version: https://dask-searchcv.readthedocs.io/en/latest/api.html#dask_searchcv.GridSearchCV. If there's no real benefit I'd prefer to leave to the user.

I've added an example, but am getting an error when I use dask-ml's Incremental. I'll dig into that tomorrow.

The real-world example I'm developing wraps an example from the Keras blog, and tunes 4 hyperparameters that are given without a strong explanation.

mrocklin commented 6 years ago

It's an interesting error. I don't know yet what's going on.

On Mon, Jun 18, 2018 at 7:41 PM, Scott Sievert notifications@github.com wrote:

@stsievert commented on this pull request.

In dask_searchcv/adaptive.py https://github.com/dask/dask-searchcv/pull/72#discussion_r196257979:

  • iters = 0
  • times = []
  • for i in range(s + 1):
  • n_i = math.floor(n * eta**-i)
  • r_i = r * eta**i
  • iters += r_i
  • msg = ('Training %d models for %d iterations during iteration %d '
  • 'of bracket=%d')
  • logger.info(msg, n_i, r_i, i, s)
  • results = {k: dask.delayed(_train)(model, data, max_iter=r_i,
  • s=s, i=i, k=k, dry_run=dry_run,
  • scorer=scorer, **fit_kwargs)
  • for k, model in models.items()}
  • results = {k: v.compute() for k, v in results.items()}

This error happens inconsistently too. I've run Hyperband.fit 4 times now, and it's passed once (with no changes to source or notebook in between).

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask-searchcv/pull/72#discussion_r196257979, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszKw3ZtixhUIzfuI2JNAUwQfSr8Lfks5t-DrEgaJpZM4UYPL1 .

stsievert commented 6 years ago

Should I create a minimal working example?

TomAugspurger commented 6 years ago

What's the error? I'm debugging an Incremental / distributed issue, and don't want to duplicate efforts.

stsievert commented 6 years ago

What's the error? I'm debugging an Incremental / distributed issue, and don't want to duplicate efforts.

The discussion around the Incremental+distributed issue is at https://github.com/dask/dask-searchcv/pull/72#discussion_r196044468. I only provide a traceback and haven't spent time debugging or crafting a minimal working example.

mrocklin commented 6 years ago

Currently it looks like we're calling compute within _successive_halving which is itself delayed and called within compute. This is suboptimal. Instead, it would generally be cleaner if we constructed as much of the computation as we can with dask.delayed and then called compute once, probably after the many calls to _successive_halving. How feasible is this? Currently it looks like _successive_halving does a bit of computation, and then a bit of administrative work. Should that administrative work be delayed? Or should we pull that administrative work outside of the _successive_halving function and do it after the final compute call?

To start off I think it might simplify things if we not special case any code for n_jobs==1 and instead use dask.delayed everywhere. Hopefully we can just set scheduler='single-threaded' in our final compute call when we're ready to achieve n_jobs==1 if that's important.

mrocklin commented 6 years ago

My apologies for not noticing this sooner. I only realized that this was happening while running the example notebook while trying to find out what has happening with the confusing error that @stsievert was getting.

mrocklin commented 6 years ago

I also think that flattening things down so that we only call compute once in between epochs, rather than calling compute within compute, will resolve any such confusing errors.

mrocklin commented 6 years ago

@stsievert let me know if the above makes sense or not. I'd be happy to jump on a call and go through things if that would be helpful.

stsievert commented 6 years ago

Thanks for noticing this @mrocklin. I think the administrative work you're mentioning is the part of successive halving that's adaptive. The models are sorted in each iteration to _successive_halving (in the call to _top_k), and the best performing models are selected. This is the reason I call compute within _successive_halving.

Here's a simplified version of what's happening:

import numpy as np
import dask

def _successive_halving():
    v1 = dask.delayed(np.random.rand)()  # model 1's score
    v2 = dask.delayed(np.random.rand)()  # model 2's score

    if v1.compute() > v2.compute():
        return 1
    return 0

if __name__ == "__main__":
    # what we're doing now
    delayed_results = [dask.delayed(_successive_halving() for _ in range(5)]
    results = [r.compute() for f in delayed_results]

    # what I think we should do (well, with joblib)
    # results = [_successive_halving() for _ in range(5)]

This is what what's happening now. I think we should uncomment the last lines and use joblib for the outer-loop. How does that sound?

mrocklin commented 6 years ago

There are a variety of reasons why it's nicer to create a single large graph if possible, I'm happy to expand on these if desired. Calling Dask from within Dask is more complex for a variety of reasons, and is best avoided if possible, especially within library code, which has a higher standard of simplicity.

Technology options in Dask are ordered something like the following:

  1. dask.delayed or other collections alone that construct and then run a single task graph. Runs everywhere, is the easiest for users to deal with, but doesn't allow for any alteration of computation between compute calls
  2. Normal concurrent.futures. Requires the distributed scheduler, but allows for submission of tasks during computation, typically using as_completed to control all of the in-flight futures.
  3. Something like Joblib, which hands control off to some other library
  4. Submitting tasks from tasks, where workers can create clients that themselves create tasks. This code is easy to write, but very difficult to debug and follow. It is best avoided if possible.

Ideally most things we write will be towards the simpler end of this list if possible. Currently we're at item four, but would like to be at 1 or maybe 2. Generally the decision between 1 and 2 is "Do I absolutely need to control the computation between calls to compute?" Now I understand that your algorithm is adaptive, but you also call compute often.

mrocklin commented 6 years ago

Anyway, I suggest that we check in at some point today

stsievert commented 6 years ago

Yeah, we should check in. I'm free all day. Let me know when works.

Thanks for the nice summary of the options to use with Dask. I suggested using joblib not to depend on dask.distributed, but when you put it that way I'm inclined towards option 2. This is less complicated than joblib + dask.delayed, plus it will still use all the workers.

I think we should also discuss where to merge this in. I have some dependence on dask-ml's train_test_split and dask_ml.metrics.

TomAugspurger commented 6 years ago

I'd be happy to join in if you end up doing it today.

On Wed, Jun 20, 2018 at 12:59 PM, Scott Sievert notifications@github.com wrote:

Yeah, we should check in. I'm free all day. Let me know when works.

Thanks for the nice summary of the options to use with Dask. I suggested using joblib not to depend on dask.distributed, but when you put it that way I'm inclined towards option 2.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask-searchcv/pull/72#issuecomment-398841842, or mute the thread https://github.com/notifications/unsubscribe-auth/ABQHIuXuPb4RSXogg6iJzRg1m6zlrMOpks5t-o2GgaJpZM4UYPL1 .

mrocklin commented 6 years ago

Whoops! Too late

On Wed, Jun 20, 2018 at 2:10 PM, Tom Augspurger notifications@github.com wrote:

I'd be happy to join in if you end up doing it today.

On Wed, Jun 20, 2018 at 12:59 PM, Scott Sievert notifications@github.com wrote:

Yeah, we should check in. I'm free all day. Let me know when works.

Thanks for the nice summary of the options to use with Dask. I suggested using joblib not to depend on dask.distributed, but when you put it that way I'm inclined towards option 2.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask-searchcv/pull/72#issuecomment-398841842, or mute the thread https://github.com/notifications/unsubscribe-auth/ ABQHIuXuPb4RSXogg6iJzRg1m6zlrMOpks5t-o2GgaJpZM4UYPL1

.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask-searchcv/pull/72#issuecomment-398845008, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszNILstnvJRGgLDukicZqJq4DkShzks5t-pACgaJpZM4UYPL1 .

mrocklin commented 6 years ago

Here is some very loose pseudocode from our conversation

def partial_fit_and_score(model, x_block, y_block, metadata):
    model = model.clone()
    model.partial_fit(x_block, y_block)
    score = model.score(...)
    metadata = metadata.copy()
    metadata['model'] = model
    metadata['score'] = score
    metadata['i'] = metadata['i'] + 1
    return metadata

def _hyperband(x, y, parameter_list, nr_list):
    x_blocks = x.to_delayed()
    y_blocks = y.to_delayed()

    nr_buckets = {nr: {...} for nr in nr_list}

    model_futures = [client.submit(partial_fit_and_score, model, x_block[0], y_block[0]) ....]
    score_futures = [client.submit(get_metadata, model_future) for model_future in model_futures]

    seq = as_completed(score_futures)

    for future in seq:
        metadata = future  # {'nr': ..., 'score': ..., 'model-id': ...,}

        nr = metadata['nr']
        should_resubmit = update(nr_buckets[nr], metadata[...])

        model_future = get_model_future_from_score_future(metadata)
        i = get_xy_block_id(metadata)

        if should_resubmit:
            model_future = client.submit(partial_fit_and_score, model_future, x_block[i], y_block[i])
            score_future = client.submit(get_metadata, model_future)
            seq.add(score_future)

    best = get_best_parameters(nr_buckets)
    return best

class HyperBand(...):
    def __init__(...):
        ...

    def fit(self, x, y):
        ...
        best = _hyperband(..., x, y, ...)
        ...
stsievert commented 6 years ago

Another pseudo-code that actually runs is at https://gist.github.com/stsievert/b2441010b855169f2a062cc1fa87eb42

stsievert commented 6 years ago

Closing in favor of https://github.com/dask/dask-ml/pull/221