EpistasisLab / tpot

A Python Automated Machine Learning tool that optimizes machine learning pipelines using genetic programming.
http://epistasislab.github.io/tpot/
GNU Lesser General Public License v3.0
9.72k stars 1.57k forks source link

use_dask and joblib don't handle Exceptions #779

Open louisabraham opened 6 years ago

louisabraham commented 6 years ago

My issue is described at https://github.com/dask/dask-jobqueue/issues/169

A huge thank to @guillaumeeb who identified the problem!

Context of the issue

Basically, an error is triggered by the dask backend if a computation encounters exceptions, and after some attempts to relaunch the task, it makes the fit function fail.

Process to reproduce the issue

Set use_dask=True with a dataset that will crash with some pipelines (for example PolynomialFeatures with 800 columns).

Expected result

Without dask, _wrapped_cross_val_score catches the exceptions and returns a -inf score. The relevant code is there: https://github.com/EpistasisLab/tpot/blob/507b45db01e8f88651f4ce8e03b607a5b50146f5/tpot/gp_deap.py#L434-L480

Current result

The current result is strange (KeyError), but the error message on the master branch of Dask will indicate that a worker restarted (I think).

Possible fix

There should be a way to catch the memory error (or worker restart) to return a -inf score.

When I try with a LSFCluster backend, it causes an exception

When I try with a LocalCluster (Client()), the whole notebook crashes after some attempts.

Immediate fix

I think using the joblib backend provided by Dask should be fine? I'll try tomorrow!

It doesn't work, see below.

Screenshots

from the dask web interface 43351582_285330748746423_8179954919642497024_n

43319853_1882248011842149_2283641915238776832_n

Logs

with one worker

traceback: https://pastebin.com/raw/QvtVXkBD dask-worker.err: https://ptpb.pw/SwbX

Notice that the dask worker restarted 3 times, which appears to be a constant.

louisabraham commented 6 years ago

I am not sure, but the timeout handling with the dask backend should be worth testing as well.

louisabraham commented 6 years ago

Unfortunately, it doesn't work either with the joblib backend.

I launched the following code:

import pandas as pd
import numpy as np

X = pd.read_csv('X_train.csv').drop(columns=['id']).values
y = pd.read_csv('y_train.csv').drop(columns=['id']).values[:, 0]

from dask_jobqueue import LSFCluster
cluster = LSFCluster(cores=2, memory='2GB', job_extra=['-R rusage[mem=2048,scratch=16000]'],
                    local_directory='$TMPDIR',
                    walltime='12:00')

from sklearn.externals import joblib
import distributed.joblib
from dask.distributed import Client
client = Client(cluster)

cluster.scale(6)

from tpot import TPOTRegressor

reg = TPOTRegressor(max_time_mins=60*8, generations=50, population_size=30,
                    cv=5,
                    scoring='r2',
                    memory='auto', random_state=42, verbosity=10, n_jobs=-1)

with joblib.parallel_backend("dask.distributed"):
    reg.fit(X, y)

After some time, I got the same errors:

Skipped pipeline #29 due to time out. Continuing to the next pipeline.
_pre_test decorator: _random_mutation_operator: num_test=0 l2 was provided as affinity. Ward can only work with euclidean distances.
_pre_test decorator: _random_mutation_operator: num_test=0 __init__() got an unexpected keyword argument 'max_depth'
_pre_test decorator: _random_mutation_operator: num_test=0 Unsupported set of arguments: The combination of penalty='l2' and loss='epsilon_insensitive' are not supported when dual=False, Parameters: penalty='l2', loss='epsilon_insensitive', dual=False
_pre_test decorator: _random_mutation_operator: num_test=0 Found array with 0 feature(s) (shape=(50, 0)) while a minimum of 1 is required.
Skipped pipeline #49 due to time out. Continuing to the next pipeline.

distributed.scheduler - ERROR - '74911161'
Traceback (most recent call last):
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/distributed/scheduler.py", line 1306, in add_worker
    plugin.add_worker(scheduler=self, worker=address)
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/dask_jobqueue/core.py", line 62, in add_worker
    self.running_jobs[job_id] = self.pending_jobs.pop(job_id)
KeyError: '74911161'
distributed.scheduler - ERROR - '74911158'
Traceback (most recent call last):
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/distributed/scheduler.py", line 1306, in add_worker
    plugin.add_worker(scheduler=self, worker=address)
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/dask_jobqueue/core.py", line 62, in add_worker
    self.running_jobs[job_id] = self.pending_jobs.pop(job_id)
KeyError: '74911158'
distributed.scheduler - ERROR - '74911160'
Traceback (most recent call last):
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/distributed/scheduler.py", line 1306, in add_worker
    plugin.add_worker(scheduler=self, worker=address)
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/dask_jobqueue/core.py", line 62, in add_worker
    self.running_jobs[job_id] = self.pending_jobs.pop(job_id)
KeyError: '74911160'
distributed.scheduler - ERROR - '74911161'
Traceback (most recent call last):
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/distributed/scheduler.py", line 1712, in remove_worker
    plugin.remove_worker(scheduler=self, worker=address)
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/dask_jobqueue/core.py", line 74, in remove_worker
    del self.running_jobs[job_id][name]
KeyError: '74911161'
distributed.scheduler - ERROR - '74911161'
Traceback (most recent call last):
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/distributed/scheduler.py", line 1306, in add_worker
    plugin.add_worker(scheduler=self, worker=address)
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/dask_jobqueue/core.py", line 62, in add_worker
    self.running_jobs[job_id] = self.pending_jobs.pop(job_id)
KeyError: '74911161'

and the cell failed without raising an error but early stopped. The progress bar was displaying 63 iterations but len(reg.evaluatedindividuals) returned 57. The errors were displayed as the memory usage of some workers exploded.

louisabraham commented 6 years ago

For use_dask, the problem probably comes from dask_ml.model_selection._search.build_graph because the argument error_score=float('-inf') was used but didn't seem to have an effect.

https://github.com/EpistasisLab/tpot/blob/507b45db01e8f88651f4ce8e03b607a5b50146f5/tpot/gp_deap.py#L443-L454

If one solved the problem with use_dask, it is probable that it will solve the dask joblib backend issue as well.

weixuanfu commented 6 years ago

@louisabraham which version of dask you are using right now? I remember we had this kind of error handling issue before but I thought @TomAugspurger fixed it.

Edit: I think that MemoryError or other out-of-resource errors may need another handler in dask.

louisabraham commented 6 years ago

I am using dask 0.19.3 and tpot 0.9.5.

TomAugspurger commented 6 years ago

It's quite likely timeouts sent by TPOT are not handled correctly right now on the dask backend. That should be possible with a little effort

It'll be a bit before I can look closely at the other errors.

louisabraham commented 6 years ago

I think it is about MemoryError, not timeouts. I reported the issue at https://github.com/dask/distributed/issues/2297

dreavjr commented 4 years ago

I'm facing the same issue. Apparently an exception in an worker is leading to a KillerWorker exception being raised, which is leading to a mistimed "RuntimeError: A pipeline has not yet been optimized. Please call fit() first." being raised, and then the whole dask machinery is shut down. I'm attaching the relevant portion of the logs and the installed packages (I'm using anaconda inside docker). log-excerpt.txt conda3.list.txt