Nixtla / mlforecast

Scalable machine 🤖 learning for time series forecasting.
https://nixtlaverse.nixtla.io/mlforecast
Apache License 2.0
860 stars 84 forks source link

[Core] Saving of the model #288

Closed iamyihwa closed 8 months ago

iamyihwa commented 9 months ago

Description

Hello, Thanks for the wonderful library, where there are constant new features and improvements! I was wondering if there is a way to save models trained in mlforecast. Saw it for neuralforecast

Use case

For features like transfer learning, it would be super useful to be able to save the model !

jmoralez commented 9 months ago

Hey @iamyihwa, thanks for using mlforecast. We currently rely on pickle/cloudpickle for the serialization, e.g.

fcst = MLForecast(...)
fcst.fit(...)

import cloudpickle

with open('mlf.pkl', 'wb') as f:
    cloudpickle.dump(fcst, f)

Would that work for your use case?

If you're going to do transfer learning there are several attributes that can be deleted, which would reduce the size. We could implement a save method to do that.

iamyihwa commented 9 months ago

Thanks @jmoralez ! Yes ! This is great!

If you're going to do transfer learning there are several attributes that can be deleted, which would reduce the size. We could implement a save method to do that. Which are the attributes that can be deleted?

A related question, of whether i can apply fine tuning with updated dataset. Since in this example of transfer learning I see that a saved model can be used to make predictions on new dataset, however didn't see an option to update the model. What would be ways to do that for neural or mlforecast??

Thank you!!

jmoralez commented 9 months ago

In neuralforecast you can just call fit again and it will continue training the same models. For mlforecast it's a bit harder because each framework may or may not support continued training and the syntax is different. If you're using LightGBM for example you should be able to provide the current model to the init_model argument of fit to continue training.

iamyihwa commented 9 months ago

Thanks @jmoralez !

Just tried that for the distributed model, and it seems to give an error. Is it because distributed models don't work the same way?? Getting error: TypeError: cannot pickle '_thread.RLock' object

in 5 6 with open('mlf.pkl', 'wb') as f: ----> 7 cloudpickle.dump(fcst, f) /local_disk0/.ephemeral_nfs/envs/pythonEnv-a70a3c46-5e29-436c-8d76-bed107dbdff0/lib/python3.8/site-packages/cloudpickle/cloudpickle.py in dump(obj, file, protocol, buffer_callback) 1459 next). 1460 """ -> 1461 Pickler(file, protocol=protocol, buffer_callback=buffer_callback).dump(obj) 1462 1463 /local_disk0/.ephemeral_nfs/envs/pythonEnv-a70a3c46-5e29-436c-8d76-bed107dbdff0/lib/python3.8/site-packages/cloudpickle/cloudpickle.py in dump(self, obj) 1243 def dump(self, obj): 1244 try: -> 1245 return super().dump(obj) 1246 except RuntimeError as e: 1247 if len(e.args) > 0 and "recursion" in e.args[0]: ``` models = [ SparkXGBForecast(num_workers = 8) ] # SparkXGBRegressor()] # , fcst = DistributedMLForecast( models, freq='W-SAT', lags=[1, 52], lag_transforms={ 1: [expanding_mean], 4: [(rolling_mean, 4 )], } , target_transforms = [Differences([1]), Mean_Scaler()], ) fcst.fit( spark_series, ) import cloudpickle with open('mlf.pkl', 'wb') as f: cloudpickle.dump(fcst, f) ```
jmoralez commented 9 months ago

The distributed version also has the engine, which isn't serializable. I'll open a PR for this, in the meantime you should be able to save the forecast object after deleting that attribute, e.g.

del fcst.engine
with open('mlf.pkl', 'wb') as f:
    cloudpickle.dump(fcst, f)
iamyihwa commented 9 months ago

Thanks for the suggestion @jmoralez !

Just tried that, however am getting an error... TypeError: cannot pickle '_thread.RLock' object

import cloudpickle
del fcst.engine
with open('mlf.pkl', 'wb') as f:
    cloudpickle.dump(fcst, f)

TypeError Traceback (most recent call last)

in 2 #del fcst.engine 3 with open('mlf.pkl', 'wb') as f: ----> 4 cloudpickle.dump(fcst, f) /local_disk0/.ephemeral_nfs/envs/pythonEnv-ca93534c-3b1e-41a1-afb0-20dc205a0581/lib/python3.8/site-packages/cloudpickle/cloudpickle.py in dump(obj, file, protocol, buffer_callback) 1459 next). 1460 """ -> 1461 Pickler(file, protocol=protocol, buffer_callback=buffer_callback).dump(obj) 1462 1463 /local_disk0/.ephemeral_nfs/envs/pythonEnv-ca93534c-3b1e-41a1-afb0-20dc205a0581/lib/python3.8/site-packages/cloudpickle/cloudpickle.py in dump(self, obj) 1243 def dump(self, obj): 1244 try: -> 1245 return super().dump(obj) 1246 except RuntimeError as e: 1247 if len(e.args) > 0 and "recursion" in e.args[0]:
jmoralez commented 9 months ago

I see the del command is commented out in your error message, did you get an error while running it?

iamyihwa commented 9 months ago

Apologies for the confusion. (I ran it twice, and first time I didn't import cloudpickle, so had to run it again. )

import cloudpickle
del fcst.engine
with open('mlf.pkl', 'wb') as f:
    cloudpickle.dump(fcst, f)

TypeError Traceback (most recent call last)

in 2 del fcst.engine 3 with open('mlf.pkl', 'wb') as f: ----> 4 cloudpickle.dump(fcst, f) /local_disk0/.ephemeral_nfs/envs/pythonEnv-68587868-d06b-43e0-8fdf-d434b864d1fb/lib/python3.8/site-packages/cloudpickle/cloudpickle.py in dump(obj, file, protocol, buffer_callback) 1459 next). 1460 """ -> 1461 Pickler(file, protocol=protocol, buffer_callback=buffer_callback).dump(obj) 1462 1463 /local_disk0/.ephemeral_nfs/envs/pythonEnv-68587868-d06b-43e0-8fdf-d434b864d1fb/lib/python3.8/site-packages/cloudpickle/cloudpickle.py in dump(self, obj) 1243 def dump(self, obj): 1244 try: -> 1245 return super().dump(obj) 1246 except RuntimeError as e: 1247 if len(e.args) > 0 and "recursion" in e.args[0]: TypeError: cannot pickle '_thread.RLock' object
iamyihwa commented 8 months ago

@jmoralez Sorry for the confusion I created above.

Just wanted to check if there is any update on this topic?? Thanks again!

jmoralez commented 8 months ago

Hey. We're working on this. The problem is that since the model holds a distributed dataframe it can't be pickled to a single file (at least for spark), so we're adding a save method that will allow saving the partitioned structure to s3 for example and then loading it back. Is that what you had in mind?

We may also add a to_local method that will combine the partitions and return an MLForecast instance which you can pickle to a single file.

iamyihwa commented 8 months ago

Thanks @jmoralez for a great job!!

Just was wondering if it is possible to make the model to distributed again? or not?

l_fcst = fcst.to_local()
# Save model 
import cloudpickle

with open('mlf.pkl', 'wb') as f:
    cloudpickle.dump(l_fcst, f)

# on a separate session
with open('mlf.pkl', 'rb') as f:
    mlf_2 = cloudpickle.load(f)

mlf_2.fit(
    spark_series_train,
    static_features = []
  #  static_features=['embedding_x', 'embedding_y'],
)

When i do this, I get this error ValueError: `df` must be either pandas or polars dataframe, got <class 'pyspark.sql.dataframe.DataFrame'> . I wonder if there is any way to convert the model back to distributed one. Thanks again!!!

jmoralez commented 8 months ago

Hey. If you want to save the distributed version you should use the save and load methods. There's an example for spark here

jmoralez commented 8 months ago

One question, why are you saving the model to then call fit again? Do you want to keep training the same model?

iamyihwa commented 8 months ago

@jmoralez Thanks ! After using the code in the documentation you suggested, works fine!

# define unique name for CI
import sys
def build_unique_name(engine):
    pyver = f'{sys.version_info.major}_{sys.version_info.minor}'
    #repo = git.Repo(search_parent_directories=True)
    #sha = repo.head.object.hexsha
    return f'{sys.platform}-model'

save_dir = build_unique_name('spark')
save_path = f'/FileStore/shared_uploads/yihwa.kim@nielseniq.com/forecast/models/mlforecast/{save_dir}'
fcst.save(save_path)
fcst2 = DistributedMLForecast.load(save_path, engine=spark)
fcst2.fit(
    spark_series_train,
    static_features = []
)

However now getting this error. AttributeError: 'XGBRegressor' object has no attribute '_pre_fit'

Detailed error

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<command-1308285184843908> in <module>
----> 1 fcst2.fit(
      2     spark_series_train,
      3     static_features = []
      4   #  static_features=['embedding_x', 'embedding_y'],
      5 )

/local_disk0/.ephemeral_nfs/envs/pythonEnv-262ec40e-eff3-4800-b58e-31e7f8dd0d3d/lib/python3.8/site-packages/mlforecast/distributed/forecast.py in fit(self, df, id_col, time_col, target_col, static_features, dropna, keep_last_n)
    418             Forecast object with series values and trained models.
    419         """
--> 420         return self._fit(
    421             df,
    422             id_col=id_col,

/local_disk0/.ephemeral_nfs/envs/pythonEnv-262ec40e-eff3-4800-b58e-31e7f8dd0d3d/lib/python3.8/site-packages/mlforecast/distributed/forecast.py in _fit(self, data, id_col, time_col, target_col, static_features, dropna, keep_last_n, window_info)
    363             train_data = featurizer.transform(prep)[target_col, "features"]
    364             for name, model in self.models.items():
--> 365                 trained_model = model._pre_fit(target_col).fit(train_data)
    366                 self.models_[name] = model.extract_local_model(trained_model)
    367         elif DASK_INSTALLED and isinstance(data, dd.DataFrame):

AttributeError: 'XGBRegressor' object has no attribute '_pre_fit'
iamyihwa commented 8 months ago

One question, why are you saving the model to then call fit again? Do you want to keep training the same model?

@jmoralez That is a good question. The idea is to see whether training with larger dataset would improve the model performance, so to continue training on the same model. Also when there is data refresh, to continue training on the same model, to see if this improves model performance over learning from scratch.
(It started also with an issue that I couldn't train a huge dataset at once. I could fit() the mode, but when doing forecast() and getting the output, takes forever, and eventually getting stuck. )

If you have any insights or ideas on this topic (size of the dataset used to train the model and model performance), feel free to mention ! I saw in the M5 forecasting competition many winning solutions trained multiple models. What is your take on this topic?

jmoralez commented 8 months ago

However now getting this error. AttributeError: 'XGBRegressor' object has no attribute '_pre_fit'

That's why I asked about re-running fit. After training we keep a local version of the trained models (so XGBRegressor instead of SparkXGBRegressor) and when saving we save these models to allow loading them back and then predicting (not training again).

When you call fit it starts from scratch, so the previous models are lost. If you want to train on a different dataset you can just initialize a new DistributedMLForecast instance and call fit on that. If you want to continue training the same model you can do something like this:

models = [SparkXGBForecast()]
fcst = DistributedMLForecast(
    models,
    freq='D',
    lags=[1],
    lag_transforms={
        1: [expanding_mean]
    },
    date_features=['dayofweek'],
)
fcst.fit(
    spark_series,
    static_features=['static_0', 'static_1'],
)

# extract the trained model
# this can be extracted after saving and loading the forecast object as well
xgb_model = fcst.models_['SparkXGBForecast'].get_booster()
# define new model that will continue training the previous model
new_models = [SparkXGBForecast(xgb_model=xgb_model)]
# define new forecast object and fit again
fcst2 = DistributedMLForecast(
    new_models,
    freq='D',
    lags=[1],
    lag_transforms={
        1: [expanding_mean]
    },
    date_features=['dayofweek'],
)
fcst2.fit(
    spark_series,
    static_features=['static_0', 'static_1'],
)
iamyihwa commented 8 months ago

@jmoralez Sorry for the late response to this topic! I have tried now to save and load again, but I am not getting any forecasts.

save_dir = build_unique_name('spark')
save_path = f'/FileStore/path/{save_dir}'
fcst.save(save_path)
fcst2 = DistributedMLForecast.load(save_path, engine=spark)

It doesn't work fcst2.predict(h = 13) , the same way as fcst.predict(h=13) . I have followed the api as defined here

jmoralez commented 8 months ago

Hey. What do you mean by it doesn't work the same way? It produces different results?

iamyihwa commented 8 months ago

Sorry for the ambiguity ! Yes it doesn't predict any result , once i save it and load it !

jmoralez commented 8 months ago

Are you getting any errors? We test that they produce the same results here