EpistasisLab / tpot

A Python Automated Machine Learning tool that optimizes machine learning pipelines using genetic programming.
http://epistasislab.github.io/tpot/
GNU Lesser General Public License v3.0
9.58k stars 1.55k forks source link

Trouble Starting Training with Dask.Read_Parquet Dataset #1264

Open windowshopr opened 1 year ago

windowshopr commented 1 year ago

Windows 10, Python 3.10.5, scikit-learn==1.1.1, TPOT==0.11.7, dask==2022.7.0, dask-glm==0.2.0, dask-ml==2022.5.27

Just trying to run a regression training session with a BIG (out of memory) parquet dataset that I created in another script. Here is a preview of what the dataset looks like:

image

There are over 300,000 total rows in the dataset broken up into 42 files, this is just the tail of it. I've made the dataset's folder available for download from my Google Drive here. I know it's a stock dataset, but think of it as any kind of time series dataset, I just created this one for proof of concept.

Context of the issue

I will provide a reproducible code below, and steps to reproduce the issue.

The issue I'm having now is that once it starts the TPOT run, it quickly chews up my RAM and crashes with errors like:

Traceback (most recent call last):
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python310\lib\site-packages\tpot\base.py", line 816, in fit
    self._pop, _ = eaMuPlusLambda(
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python310\lib\site-packages\tpot\gp_deap.py", line 228, in eaMuPlusLambda
    population[:] = toolbox.evaluate(population)
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python310\lib\site-packages\tpot\base.py", line 1560, in _evaluate_individuals
    tmp_result_scores = list(dask.compute(*tmp_result_scores))
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python310\lib\site-packages\dask\base.py", line 603, in compute
    results = schedule(dsk, keys, **kwargs)
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\client.py", line 3000, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\client.py", line 2174, in gather
    return self.sync(
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\utils.py", line 338, in sync
    return sync(
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\utils.py", line 405, in sync
    raise exc.with_traceback(tb)
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\utils.py", line 378, in f
    result = yield future
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python310\lib\site-packages\tornado\gen.py", line 762, in run
    value = future.result()
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python310\lib\site-packages\distributed\client.py", line 2038, in _gather
    raise exc
concurrent.futures._base.CancelledError: nanmean-e201c831-e405-47dd-8f57-cbae1c624399

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "Z:\Python_Projects\test5.py", line 86, in <module>
    results = tpot.fit(X_train.values, y_train.values)
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python310\lib\site-packages\tpot\base.py", line 863, in fit
    raise e
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python310\lib\site-packages\tpot\base.py", line 854, in fit
    self._update_top_pipeline()
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python310\lib\site-packages\tpot\base.py", line 961, in _update_top_pipeline
    raise RuntimeError(
RuntimeError: A pipeline has not yet been optimized. Please call fit() first.

The only guides I've seen so far with mixing TPOT and DASK assume that the dataset can just be read into memory, which this one cannot.

Does TPOT support reading in a broken up parquet dataset like this, or does it only support in-memory datasets?

Process to reproduce the issue

  1. Download the dataset from Google Drive, and put it in a data folder in the same directory as the below script
  2. Open up 2 separate PowerShell windows
  3. In the first PowerShell window, run the command dask-scheduler
  4. In the second PowerShell window, runt the command dask-worker tcp://172.16.1.113:8786 --nprocs 1 --nthreads 5
  5. Edit the IP from the previous step as needed for whatever your local IP address is for the computer you're using. You'll also need to edit it on line 59 of the code below.
  6. You've started a dask scheduler and connected a "worker" on the same machine. You will connect the Client in the script below.
  7. Start the Task Manager to monitor resources
  8. Run this script and watch your RAM in the Task Manager before crash

Not sure if TPOT even supports this, but figured I would ask and provide a reproducible example (and the dataset I'm trying to use). How can TPOT run with out-of-memory datasets? Here is the code:

from datetime import datetime

import dask.dataframe as dd
import dask.delayed
from dask.dataframe import read_parquet
from dask_ml.model_selection import train_test_split
from distributed import Client

from sklearn.model_selection import TimeSeriesSplit

from tpot import TPOTRegressor

# ------------------------------------------------------------------------------------------------ #
#                                             VARIABLES                                            #
# ------------------------------------------------------------------------------------------------ #
TEST_SIZE = 0.25
CUSTOM_DATE_PARSER = lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S")

# ------------------------------------------------------------------------------------------------ #
#                                        IMPORT THE DATASET                                        #
# ------------------------------------------------------------------------------------------------ #
IMPORTED_DF = read_parquet("./data/regression_df.parquet", 
                           parse_dates=['time'], 
                           date_parser=CUSTOM_DATE_PARSER, 
                           index_col='time')
# Solution Attempt 1 - Repartition the dataset? Chunk size is unknown...I've tried 10, 50, 200, 500
IMPORTED_DF = IMPORTED_DF.repartition(npartitions=10)

# ------------------------------------------------------------------------------------------------ #
#                                        INSPECT THE DATASET                                       #
# ------------------------------------------------------------------------------------------------ #
# Dataset has nan's so drop them now and inspect
IMPORTED_DF = IMPORTED_DF.dropna().compute()
print(IMPORTED_DF.head(20))
print(IMPORTED_DF.tail(20))

# ------------------------------------------------------------------------------------------------ #
#                                         TRAIN TEST SPLIT                                         #
# ------------------------------------------------------------------------------------------------ #
X_train, X_test, y_train, y_test = train_test_split(IMPORTED_DF.drop('label', axis=1), IMPORTED_DF['label'],
                                                    test_size=TEST_SIZE,
                                                    shuffle=False)
print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)

# ------------------------------------------------------------------------------------------------ #
#                                     CONNECT TO DASK SCHEDULER                                    #
# ------------------------------------------------------------------------------------------------ #
print("Connecting to client...")
client = Client("tcp://172.16.1.113:8786")

# ------------------------------------------------------------------------------------------------ #
#                                    CREATE THE TPOT REGRESSOR                                     #
# ------------------------------------------------------------------------------------------------ #
tpot = TPOTRegressor(generations=100, 
                     population_size=25,
                     offspring_size=None, 
                     mutation_rate=0.9,
                     crossover_rate=0.1,
                    #  scoring=my_custom_scorer,
                     cv=TimeSeriesSplit(n_splits=3), # Using time series split here
                     subsample=1.0, 
                    #  n_jobs=-1,
                     max_time_mins=None, 
                     max_eval_time_mins=30, # 5
                     random_state=None, 
                    #  config_dict=classifier_config_dict,
                     template=None,
                     warm_start=False,
                     memory=None,
                     use_dask=True,
                     periodic_checkpoint_folder=None,
                     early_stop=2,
                     verbosity=2,
                     disable_update_check=False)

# ------------------------------------------------------------------------------------------------ #
#                                          START TRAINING                                          #
# ------------------------------------------------------------------------------------------------ #
results = tpot.fit(X_train.values, y_train.values)
print(tpot.score(X_test.values, y_test.values))
tpot.export('tpot_pipeline.py')

# ------------------------------------------------------------------------------------------------ #
#                                       INSPECT TEST DATASET                                       #
# ------------------------------------------------------------------------------------------------ #
X_test['predictions'] = tpot.predict(X_test.values)
X_test['predictions'] = X_test['predictions'].astype('int32')
X_test['labels'] = y_test

X_test = X_test[["1min_open","1min_high","1min_low","1min_close","labels","predictions"]]
print(X_test)