Nixtla / mlforecast

Scalable machine 🤖 learning for time series forecasting.
https://nixtlaverse.nixtla.io/mlforecast
Apache License 2.0
858 stars 84 forks source link

[distributed]: allow for .ts.update in DistributedMLForecast #317

Closed maxi-marufo closed 7 months ago

maxi-marufo commented 7 months ago

Description

It is mentioned here that we can incorporate new data to our MLForecast models by updating their time-series object. However, this feature is not valid in DistributedMLForecast.

Use case

I tried using mlforecast_model._base_ts.update(mlforecast_data), but I got this error:

File /alloc/data/fury_cemld-kubesizer/notebooks/my-env/lib/python3.9/site-packages/mlforecast/core.py:773, in TimeSeries.update(self, df)
    771 def update(self, df: DataFrame) -> None:
    772     """Update the values of the stored series."""
--> 773     validate_format(df, self.id_col, self.time_col, self.target_col)
    774     uids = self.uids
    775     if isinstance(uids, pd.Index):

AttributeError: 'TimeSeries' object has no attribute 'id_col'

So I also tried:

mlforecast_model._base_ts.id_col = "unique_id"
mlforecast_model._base_ts.time_col = "ds"
mlforecast_model._base_ts.target_col = "y"
mlforecast_model._base_ts.update(mlforecast_data)

and this time the error was:

File /alloc/data/fury_cemld-kubesizer/notebooks/my-env/lib/python3.9/site-packages/mlforecast/core.py:774, in TimeSeries.update(self, df)
    772 """Update the values of the stored series."""
    773 validate_format(df, self.id_col, self.time_col, self.target_col)
--> 774 uids = self.uids
    775 if isinstance(uids, pd.Index):
    776     uids = pd.Series(uids)

AttributeError: 'TimeSeries' object has no attribute 'uids'
jmoralez commented 7 months ago

Hey @maxi-marufo, thanks for using mlforecast. Are you performing distributed inference? I think it may be easier to set keep_last_n during fit (to reduce the history that is kept), use to_local and then update the local forecast object. If that wouldn't work for you I think we could add an update method that would broadcast the new dataframe to all workers and update each partition locally.

maxi-marufo commented 7 months ago

Yes, I'm performing distributed inference. I could try to use to_local to be able to then use .ts.update, but that would also require transforming the model back to a distributed version. Is there a method or way to do that? Also, this would actually be a workaround to the update method in DistributedMLForecast. Btw, another possible feature related to this would be to be able to continue a previous training, by using the init_model param in LightGBM and xgb_model param in XGBoost. I guess the continuation of an MLForecast training would mean to update the time-series and then refit the LightGBM/XGBoost model, right?

jmoralez commented 7 months ago

Just added the DistributedMLForecast.update, it'll be available in the next release, which should be later this week. Note that it takes a pandas dataframe, so that'll be broadcasted to all the workers and each one will update its corresponding series.

For continuing training, yes. You could call update and then train the models for a few more iterations. One important thing to note here is that preprocess and fit modify the internal state of the forecast object (saves the series values and the last dates and so on) so you have to be careful, I suggest using preprocess/fit on a different forecast object that gets the models with the arguments to continue training and then overriding the models_ attribute of your original object, so something like:

orig_fcst = DistributedMLForecast.load(...)
orig_fcst.update(new_df)
new_fcst = DistributedMLForecast(
    models={'lgb': SparkLGBMForecast(init_model=orig_fcst.models_['lgb'], ...)
)
new_fcst.fit(some_df)  # doesn't necessarily have to be new_df, could potentially have a bit more history to compute your features
orig_fcst.models_ = new_fcst.models_
orig_fcst.save(...)

Feel free to open a new issue if you run into problems trying to continue training.