unit8co / darts

A python library for user-friendly forecasting and anomaly detection on time series.
https://unit8co.github.io/darts/
Apache License 2.0
8.02k stars 872 forks source link

[BUG] optimization is too slow AWS EC2 instances #2569

Open raamana opened 2 days ago

raamana commented 2 days ago

Describe the bug first of all, I love the library and thank you for open sourcing and maintaining it.

Issue: I am optimizing a forecasting model with Optuna and the individual trials finish in about 2 mins on M3 Max Macbook pro ( Number of Cores: 14 (10 performance and 4 efficiency)) for a wide variety of hyper param configurations. When I run the exact same thing on an EC2 instance (type m8g.8xlarge with 32 vCPU and 128 GB RAM), and they never finish even when given 30 mins per trial (accounting for CPU vs GPU differences). What might be going on?

I am using a timeout decorator as suggested in the Optuna forums to prune long-running or hungup trials, and on EC2 every single trial is getting pruned (both for NBeats and TFT)

@timeout_decorator.timeout(max_run_time_per_trial,  # in seconds
                           timeout_exception=optuna.TrialPruned,  # trial result will be noted as pruned
                           use_signals=True)  # needed for process-based parallelization with n_jobs=1
def objective_TFT(trial):

I know this might not be Darts-specific issue but I would appreciate any constructive feedback and pointers.

also reported to the good folks at optuna: https://github.com/optuna/optuna/issues/5724

To Reproduce

i am not allowed to share code or data but I can try to write down a simplest example to reproduce the issue that does not involve any corporate restrictions, but I would like your feedback on the above before I develop this example

Expected behavior individuals trials of hyper param optim should not take 10x longer on EC2

System (please complete the following information):

Additional context

dennisbader commented 1 day ago

Hi @raamana, hard to say what it is without any code. A minimal example would be great.

For a start, the model setup and fit call could already help.

Also, does the model even begin training or is it hanging before?

raamana commented 1 day ago

thanks @dennisbader

1) I wasn't fitting them in any way different from that suggested by the tutorials e.g.,

torch.manual_seed(1)
np.random.seed(1)

def generate_torch_kwargs():
    # run torch models on CPU, and disable progress bars for all model stages except training.
    return {
        "pl_trainer_kwargs": {
            "accelerator": "cpu",
            "callbacks": [TFMProgressBar(enable_train_bar_only=True)],
        }
    }

global_model = NBEATSModel(
    input_chunk_length=6,
    output_chunk_length=12
    n_epochs=200,
    random_state=42,
    **generate_torch_kwargs()
)

with different values injected from optuna suggestions, following example 17

2) the model begins training, and there are no intermediate errors. the only issue is optuna trials never finish on EC2, whereas they get done on my laptop in 2 mins! same search space.

raamana commented 20 hours ago

Hi @dennisbader , here is a mininum working example as requested, mostly a copy/paste from Darts examples. I ran this on my laptop, and the EC2 instance. The median run times were 0.30 min and 9.96 mins respectively per completed trial - it took 33x more time on EC2. Previously, trials were all getting pruned likely because with our data (much bigger than this example, and typically with 450 epochs as opposed to 50 in this toy example), they would have run for over 10 hours (33x9x2 mins/trial), much longer than 1 hour timeout I had been giving.

the only change i made on EC2 was to change the device type to be cpu in the pl_trainer_kwargs, and # processors to be 31 (it had 32 vCPUs with 128GB ram).

I can keep the # epochs smaller but from variable importance calculations its one of the top 3, and I wouldn't sacrifice that. other than paying for GPU based instances, any other suggestions?

import warnings
from pathlib import Path
import numpy as np
import optuna
import pandas as pd
import timeout_decorator
import torch
from darts import TimeSeries
from darts.dataprocessing.transformers import Scaler
from darts.datasets import AirPassengersDataset
from darts.metrics import smape
from darts.models import TFTModel
from darts.utils.likelihood_models import QuantileRegression
from darts.utils.timeseries_generation import datetime_attribute_timeseries
from optuna.exceptions import OptunaError
warnings.filterwarnings("ignore")
import logging
logging.disable(logging.CRITICAL)

torch.manual_seed(1)
np.random.seed(1)

def prep_data():
    series = AirPassengersDataset().load() / TimeSeries.from_series(series.time_index.days_in_month)
    series = series.astype(np.float32)
    training_cutoff = pd.Timestamp("19571201")
    train, val = series.split_after(training_cutoff)
    transformer = Scaler()
    train_transformed, val_transformed = transformer.fit_transform(train), transformer.transform(val)
    covariates = datetime_attribute_timeseries(series, attribute="year", one_hot=False).stack(
        datetime_attribute_timeseries(series, attribute="month", one_hot=False)
    ).stack(TimeSeries.from_times_and_values(times=series.time_index, values=np.arange(len(series)), columns=["linear_increase"])).astype(np.float32)
    scaler_covs = Scaler()
    scaler_covs.fit(covariates.split_after(training_cutoff)[0])
    return transformer.transform(train), transformer.transform(val), scaler_covs.transform(covariates)

def run_optim_TFT(n_trials_per_job=10, n_jobs=1, max_run_time_per_trial=350, study_name='optuna', out_dir=None, random_seed=42):
    train, val, covariates = prep_data()

    def build_TFT_model(ICL=12, OCL=12, hidden_size=64, lstm_layers=1, num_attention_heads=4, dropout=0.1, batch_size=16, n_epochs=300, random_state=random_seed):
        quantiles = [0.01, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95, 0.99]
        tft_model = TFTModel(input_chunk_length=ICL, output_chunk_length=OCL, hidden_size=hidden_size, lstm_layers=lstm_layers, num_attention_heads=num_attention_heads,
                             dropout=dropout, batch_size=batch_size, n_epochs=n_epochs, add_relative_index=True, pl_trainer_kwargs={"accelerator": "gpu", "precision": '32-true'},
                             likelihood=QuantileRegression(quantiles=quantiles), random_state=random_state)
        tft_model.fit(train, future_covariates=covariates, verbose=True)
        return tft_model

    @timeout_decorator.timeout(max_run_time_per_trial, timeout_exception=optuna.TrialPruned, use_signals=True)
    def objective_TFT(trial):
        tft_model = build_TFT_model(ICL=trial.suggest_int("input_chunk_length", 6, 36, step=6), OCL=9, hidden_size=trial.suggest_int("hidden_size", 32, 128, step=16),
                                    lstm_layers=trial.suggest_int("lstm_layers", 1, 5, step=2), num_attention_heads=trial.suggest_int("num_attention_heads", 1, 5, step=2),
                                    dropout=trial.suggest_float("dropout", 0.1, 0.5, step=0.1), batch_size=trial.suggest_int("batch_size", 8, 24, step=8), n_epochs=20)
        pred = tft_model.predict(n=9)
        smape_val = smape(val, pred)
        return smape_val if smape_val != np.nan else float("inf")

    def print_callback(study_, trial):
        print(f"Current value: {trial.value:10.3f}\n\tCurrent params: {trial.params}")
        print(f"Best value   : {study_.best_value:10.3f}\n\tBest params: {study_.best_trial.params}")

    from datetime import datetime
    from optuna.storages import JournalStorage
    from optuna.storages.journal import JournalFileBackend

    study = optuna.create_study(direction="minimize", study_name=f"{study_name}_{datetime.now().strftime('%Y-%m')}", storage=JournalStorage(JournalFileBackend(f"journal_{study_name}.optuna.log")), load_if_exists=True)
    study.optimize(objective_TFT, n_trials=n_trials_per_job, n_jobs=n_jobs, callbacks=[print_callback], catch=(Exception, OptunaError))
    print(f"Best value: {study.best_value:8.3f}, Best params: {study.best_trial.params}")

def task(in_params):
    run_optim_TFT(n_trials_per_job=10, n_jobs=1, max_run_time_per_trial=500, study_name='optuna_mwe', out_dir=Path.cwd(), random_seed=42)

if __name__ == "__main__":
    import multiprocessing
    num_procs = 5
    if num_procs > 1:
        print(f'parallelizing optimization with {num_procs} processors')
        pool = multiprocessing.Pool(processes=num_procs)
        results = pool.map(task, range(num_procs))
        pool.close()
        pool.join()
        print("\n\noutput log captured by multiprocessing:\n\n", results)
    else:
        print('running task in batch mode on single processor')
        task(None)