Nixtla / mlforecast

Scalable machine 🤖 learning for time series forecasting.
https://nixtlaverse.nixtla.io/mlforecast
Apache License 2.0
808 stars 75 forks source link

[Core] Inconsistent, and overlapping forecasting time steps generated, as a result of predict() #259

Closed iamyihwa closed 8 months ago

iamyihwa commented 8 months ago

What happened + What you expected to happen

Hi, I 've witnessed a very strange case, where the timesteps of resulting forecasts don't correspond to the number of horizon given.

There are a few issues. (1) The resulting time series don't correspond to the number of horizons given (13 ).

image (2) There are many overlapping times that have been generated

image

(3) Also they are not consistent. In this example, where 3 different unique_ids are provided, 2 of them have different number of time steps compared to the other one. (See figure 1)

Versions / Dependencies

0.10.0

Reproduction script


import random
import numpy as np 
start_date = '2021-06-05'
end_date = '2021-12-25'
weekly_dates = pd.date_range(start=start_date, end=end_date, freq='W-SAT')
val_1 = [ 70.,  52.,  44.,  64.,  70.,  83.,  65.,  89.,  72.,  66.,  34.,
        95.,  46.,  34.,  87.,  77.,  58., 103.,  73.,  48.,  78.,  84.,
        71.,  65.,  79.,   0.,  78.,  86.,  57.,   0.]
#Creating dataframe 
train_df = pd.DataFrame({'ds': np.tile(weekly_dates,3 ) , 
              'unique_id': ['abc']*len(weekly_dates) + ['def']*len(weekly_dates) + ['ghi']* len(weekly_dates), 
               'y': val_1*3, 
                'embedding_x': random.uniform(1, len(weekly_dates)*3),
                'embedding_y': random.uniform(1, len(weekly_dates)*3) }                           )

train_df.iteritems = train_df.items
train_sf = spark.createDataFrame(train_df)

from mlforecast.distributed.forecast import DistributedMLForecast
from window_ops.expanding import expanding_mean
#from mlforecast.distributed.models.spark.lgb import SparkLGBMForecast

#models = [SparkLGBMForecast()]
#try:
#    from xgboost.spark import SparkXGBRegressor
models = []
from mlforecast.distributed.models.spark.xgb import SparkXGBForecast
models.append(SparkXGBForecast())
#except ModuleNotFoundError:  # py < 38
#    pass
fcst = DistributedMLForecast(
    models,
    freq='W-SAT',
    lags=[1],
    lag_transforms={
        1: [expanding_mean]
    },
   # date_features=['dayofweek'],
)

fcst.fit(
    train_sf, #cur_subset_train_sf, #   Y_sf_train_cur_level ,  #
    static_features=['embedding_x', 'embedding_y'],
)

preds = fcst.predict(13)
Y_df_hat = preds.toPandas()
Y_df_hat.rename(columns = {'SparkXGBForecast':'pred'}, inplace = True)

Issue Severity

None

jmoralez commented 8 months ago

Hey @iamyihwa. As described here, it's very important that your dataframe is partitioned by series for the distributed processing to work correctly, so you have to repartition your dataframe or provide a value for num_partitions to the DistributedMLForecast constructor.

iamyihwa commented 8 months ago

Thanks a lot @jmoralez ! By using .repartitionByRange as suggested, the problem is solved!