awslabs / gluonts

Probabilistic time series modeling in Python
https://ts.gluon.ai
Apache License 2.0
4.55k stars 747 forks source link

M5 evaluation not working #2091

Open Poulami-Sarkar opened 2 years ago

Poulami-Sarkar commented 2 years ago

I am trying to run the GluonTS tutorial on the M5 dataset. However, the m5 evaluation code is failing. Can someone please help me with the same?

To Reproduce

https://github.com/awslabs/gluon-ts/blob/dev/examples/m5_gluonts_template.ipynb

%matplotlib inline
import mxnet as mx
from mxnet import gluon
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json
import os
from tqdm import tqdm
from pathlib import Path

single_prediction_length = 28
submission_prediction_length = single_prediction_length * 2
m5_input_path="./m5-forecasting-accuracy"
submission=True

if submission:
    prediction_length = submission_prediction_length
else:
    prediction_length = single_prediction_length

calendar = pd.read_csv(f'{m5_input_path}/calendar.csv')
sales_train_validation = pd.read_csv(f'{m5_input_path}/sales_train_validation.csv')
sample_submission = pd.read_csv(f'{m5_input_path}/sample_submission.csv')
sell_prices = pd.read_csv(f'{m5_input_path}/sell_prices.csv')

cal_features = calendar.drop(
    ['date', 'wm_yr_wk', 'weekday', 'wday', 'month', 'year', 'event_name_1', 'event_name_2', 'd'], 
    axis=1
)
cal_features['event_type_1'] = cal_features['event_type_1'].apply(lambda x: 0 if str(x)=="nan" else 1)
cal_features['event_type_2'] = cal_features['event_type_2'].apply(lambda x: 0 if str(x)=="nan" else 1)

test_cal_features = cal_features.values.T
if submission:
    train_cal_features = test_cal_features[:,:-submission_prediction_length]
else:
    train_cal_features = test_cal_features[:,:-submission_prediction_length-single_prediction_length]
    test_cal_features = test_cal_features[:,:-submission_prediction_length]

test_cal_features_list = [test_cal_features] * len(sales_train_validation)
train_cal_features_list = [train_cal_features] * len(sales_train_validation)

state_ids = sales_train_validation["state_id"].astype('category').cat.codes.values
state_ids_un , state_ids_counts = np.unique(state_ids, return_counts=True)

store_ids = sales_train_validation["store_id"].astype('category').cat.codes.values
store_ids_un , store_ids_counts = np.unique(store_ids, return_counts=True)

cat_ids = sales_train_validation["cat_id"].astype('category').cat.codes.values
cat_ids_un , cat_ids_counts = np.unique(cat_ids, return_counts=True)

dept_ids = sales_train_validation["dept_id"].astype('category').cat.codes.values
dept_ids_un , dept_ids_counts = np.unique(dept_ids, return_counts=True)

item_ids = sales_train_validation["item_id"].astype('category').cat.codes.values
item_ids_un , item_ids_counts = np.unique(item_ids, return_counts=True)

stat_cat_list = [item_ids, dept_ids, cat_ids, store_ids, state_ids]

stat_cat = np.concatenate(stat_cat_list)
stat_cat = stat_cat.reshape(len(stat_cat_list), len(item_ids)).T

stat_cat_cardinalities = [len(item_ids_un), len(dept_ids_un), len(cat_ids_un), len(store_ids_un), len(state_ids_un)]

from gluonts.dataset.common import load_datasets, ListDataset
from gluonts.dataset.field_names import FieldName

train_df = sales_train_validation.drop(["id","item_id","dept_id","cat_id","store_id","state_id"], axis=1)
train_target_values = train_df.values

if submission == True:
    test_target_values = [np.append(ts, np.ones(submission_prediction_length) * np.nan) for ts in train_df.values]
else:
    test_target_values = train_target_values.copy()
    train_target_values = [ts[:-single_prediction_length] for ts in train_df.values]

m5_dates = [pd.Timestamp("2011-01-29", freq='1D') for _ in range(len(sales_train_validation))]

train_ds = ListDataset([
    {
        FieldName.TARGET: target,
        FieldName.START: start,
        FieldName.FEAT_DYNAMIC_REAL: fdr,
        FieldName.FEAT_STATIC_CAT: fsc
    }
    for (target, start, fdr, fsc) in zip(train_target_values,
                                         m5_dates,
                                         train_cal_features_list,
                                         stat_cat)
], freq="D")

test_ds = ListDataset([
    {
        FieldName.TARGET: target,
        FieldName.START: start,
        FieldName.FEAT_DYNAMIC_REAL: fdr,
        FieldName.FEAT_STATIC_CAT: fsc
    }
    for (target, start, fdr, fsc) in zip(test_target_values,
                                         m5_dates,
                                         test_cal_features_list,
                                         stat_cat)
], freq="D")

from gluonts.model.deepar import DeepAREstimator
from gluonts.mx.distribution.neg_binomial import NegativeBinomialOutput
from gluonts.mx.trainer import Trainer

estimator = DeepAREstimator(
    prediction_length=prediction_length,
    freq="D",
    distr_output = NegativeBinomialOutput(),
    use_feat_dynamic_real=True,
    use_feat_static_cat=True,
    cardinality=stat_cat_cardinalities,
    trainer=Trainer(
        learning_rate=1e-3,
        epochs=100,
        num_batches_per_epoch=50,
        batch_size=32
    )
)

predictor = estimator.train(train_ds)

from gluonts.evaluation.backtest import make_evaluation_predictions

forecast_it, ts_it = make_evaluation_predictions(
    dataset=test_ds,
    predictor=predictor,
    num_samples=100
)

print("Obtaining time series conditioning values ...")
tss = list(tqdm(ts_it, total=len(test_ds)))
print("Obtaining time series predictions ...")
forecasts = list(tqdm(forecast_it, total=len(test_ds)))

submission = False
if submission == False:

    from gluonts.evaluation import Evaluator

    class M5Evaluator(Evaluator):

        def get_metrics_per_ts(self, time_series, forecast):
            successive_diff = np.diff(time_series.values.reshape(len(time_series)))
            successive_diff = successive_diff ** 2
            successive_diff = successive_diff[:-prediction_length]
            denom = np.mean(successive_diff)
            pred_values = forecast.samples.mean(axis=0)
            true_values = time_series.values.reshape(len(time_series))[-prediction_length:]
            num = np.mean((pred_values - true_values)**2)
            rmsse = num / denom
            metrics = super().get_metrics_per_ts(time_series, forecast)
            metrics["RMSSE"] = rmsse
            return metrics

        def get_aggregate_metrics(self, metric_per_ts):
            wrmsse = metric_per_ts["RMSSE"].mean()
            agg_metric , _ = super().get_aggregate_metrics(metric_per_ts)
            agg_metric["MRMSSE"] = wrmsse
            return agg_metric, metric_per_ts

    evaluator = M5Evaluator(quantiles=[0.5, 0.67, 0.95, 0.99])
    agg_metrics, item_metrics = evaluator(iter(tss), iter(forecasts), num_series=len(test_ds))
    print(json.dumps(agg_metrics, indent=4))

Error

Running evaluation: 100%|██████████| 30490/30490 [00:00<00:00, 111656.51it/s]
Output exceeds the size limit. Open the full output data in a text editor
---------------------------------------------------------------------------
RemoteTraceback                           Traceback (most recent call last)
RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/site-packages/multiprocess/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/site-packages/multiprocess/pool.py", line 48, in mapstar
    return list(map(*args))
  File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/site-packages/gluonts/evaluation/_base.py", line 56, in worker_function
  File "/var/folders/gr/ff8nqlbn5yj2xyvj129ggsfh0000gp/T/ipykernel_45492/2865651275.py", line 16, in get_metrics_per_ts
    metrics = super().get_metrics_per_ts(time_series, forecast)
  File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/site-packages/gluonts/evaluation/_base.py", line 318, in get_metrics_per_ts
    try:
ValueError: operands could not be broadcast together with shapes (28,) (56,) 
"""

The above exception was the direct cause of the following exception:

ValueError                                Traceback (most recent call last)
/Users/poulamisarkar/Documents/TUM/sem2/seminar/glutons /m5_gluonts_template.ipynb Cell 26' in <cell line: 1>()
     24         return agg_metric, metric_per_ts
     27 evaluator = M5Evaluator(quantiles=[0.5, 0.67, 0.95, 0.99])
---> 28 agg_metrics, item_metrics = evaluator(iter(tss), iter(forecasts), num_series=len(test_ds))
     29 print(json.dumps(agg_metrics, indent=4))
...
    769     return self._value
    770 else:
--> 771     raise self._value

ValueError: operands could not be broadcast together with shapes (28,) (56,) 

Environment

lostella commented 2 years ago

@Poulami-Sarkar hi, thanks for raising the issue! I cannot reproduce the issue locally.

For brevity, I'm running the example using epochs = 3 in the estimator, and reducing the test dataset in size after it's created, with

from itertools import islice
test_ds = list(islice(test_ds, 20))

This way the code runs in 20 seconds or so. Could you confirm that you get the error also with these changes?

Poulami-Sarkar commented 2 years ago

Hello,

I tried the changes you suggested but now I am getting a new error

`Running evaluation: 0%| | 0/20 [00:00<?, ?it/s] Traceback (most recent call last): File "", line 1, in File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main exitcode = _main(fd, parent_sentinel) File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/multiprocessing/spawn.py", line 125, in _main prepare(preparation_data) File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/multiprocessing/spawn.py", line 236, in prepare _fixup_main_from_path(data['init_main_from_path']) File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/multiprocessing/spawn.py", line 287, in _fixup_main_from_path main_content = runpy.run_path(main_path, File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/runpy.py", line 268, in run_path return _run_module_code(code, init_globals, run_name, File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/runpy.py", line 97, in _run_module_code _run_code(code, mod_globals, init_globals, File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/runpy.py", line 87, in _run_code exec(code, run_globals) File "/Users/poulamisarkar/Documents/TUM/sem2/seminar/glutons /test.py", line 322, in agg_metrics, item_metrics = evaluator(iter(tss), iter(forecasts), num_series=len(test_ds)) File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/site-packages/gluonts/evaluation/_base.py", line 212, in call mp_pool = multiprocessing.Pool( File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/multiprocessing/context.py", line 119, in Pool return Pool(processes, initializer, initargs, maxtasksperchild, File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/multiprocessing/pool.py", line 212, in init self._repopulate_pool() File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/multiprocessing/pool.py", line 303, in _repopulate_pool return self._repopulate_pool_static(self._ctx, self.Process, File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/multiprocessing/pool.py", line 326, in _repopulate_pool_static w.start() File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/multiprocessing/process.py", line 121, in start self._popen = self._Popen(self) File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/multiprocessing/context.py", line 284, in _Popen return Popen(process_obj) File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in init super().init(process_obj) File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/multiprocessing/popen_fork.py", line 19, in init self._launch(process_obj) File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 42, in _launch prep_data = spawn.get_preparation_data(process_obj._name) File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/multiprocessing/spawn.py", line 154, in get_preparation_data _check_not_importing_main() File "/Users/poulamisarkar/opt/anaconda3/envs/forcasting/lib/python3.9/multiprocessing/spawn.py", line 134, in _check_not_importing_main raise RuntimeError(''' RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase.

    This probably means that you are not using fork to start your
    child processes and you have forgotten to use the proper idiom
    in the main module:

        if __name__ == '__main__':
            freeze_support()
            ...

    The "freeze_support()" line can be omitted if the program
    is not going to be frozen to produce an executable.

100%|██████████████████████`

lostella commented 2 years ago

@Poulami-Sarkar looks like there are some issues with multiprocessing. You can try two things to fix this:

Running the following on top of your script/notebook

import multiprocessing
multiprocessing.set_start_method('fork')

Or setting

num_workers=None

when constructing the Evaluator.