Nixtla / mlforecast

Scalable machine 🤖 learning for time series forecasting.
https://nixtlaverse.nixtla.io/mlforecast
Apache License 2.0
911 stars 90 forks source link

Do not re-calculate lags/transforms if the df already has them #260

Closed huwf closed 12 months ago

huwf commented 1 year ago

Description

Hi, two things which would be useful for me if at all possible and don't seem to be in the code that I can tell.

  1. When calling .fit or .preprocess, I'd like it so that if the column already exists for one of the lags/dates/lag_transforms it does not try to compute it. Then I can store those features at the same time as any others, and pass this data frame to .fit or .fit_models later.
  2. More flexibility in naming generated or transformed columns as opposed to f"lag{lag}" or the output from_build_transform_name, maybe adding aCallable` somewhere perhaps? For example, the convention I have at my job is to call a column something like fieldname_lag_24, which is different to mlforecast. This is relatively easy for me to work around, but would be nice to have if it's easy to do

Use case

I'm working within an existing internal pipeline which has its own conventions.

We have different methods for collecting data from different sources, and this returns a dataframe and adds/cuts off data for lags and windows at a different place than where the model is trained. This makes pickling and using a preprocessed model difficult

jmoralez commented 1 year ago

Hey @huwf, thanks for using mlforecast.

About the first point, you specify them in the constructor so that they're computed when predicting, but you don't want to compute them in the feature engineering step, is that it?

For the second point, would providing a function with the same signature as _build_transform_name https://github.com/Nixtla/mlforecast/blob/af2173537420545570a94f95570de5c779be5614/mlforecast/core.py#L83 to the MLForecast constructor work for you?

huwf commented 1 year ago

Hi, thanks for the quick response

For the first, yes that's correct.

For the second, yes, that's exactly what I had in mind.

Possibly also the same for the straight lags too, instead of L190. Although I just thought, I guess I could do a noop function in lag_transforms to get the same effect here I guess rather than needing two naming functions?

jmoralez commented 1 year ago

First point makes sense, I think we can implement it.

For the second one as you can see a lag is actually just the mlforecast.core._identity function, so you could definitely do something like: lag_transforms={1: [_identity]} and if you've provided a function to rename them it'd work. I think we can implement that as well.

expertise-team commented 1 month ago

Is this working also for the distributed version of the library? We would like to save the calculated features, and reuse them later for training in Spark, Databricks Feature Store.

jmoralez commented 1 month ago

Hey. It should work, but all it does is look at the input df before computing a feature and checking if a column with that feature name already exists, so you have to make sure you use the same names as the ones produced by mlforecast. You can also provide a lag_transforms_namer to the constructor if you'd like to use a different naming convention.

expertise-team commented 1 month ago

Thank you for your quick answer. We really enjoy your library, it is very useful, great work.

I have tested it but unfortunately it doesn't work in a Distributed environment (Spark 3.5.0)

Steps:

Prepared a DistributedMLForecast objecct with some lags and lag_transforms.

fcst = DistributedMLForecast( models, freq='D', lags=[1,2,3,4,5,6,7],
lag_transforms={ 1: [RollingMean(7),RollingMean(30), ExpandingMean()], 335: [RollingMean(30)], }, date_features=['dayofweek']
)

Prepared a very simple training dataset and ran the preprocessing:

training_df_featured = fcst.preprocess(training_df, id_col='unique_id', time_col='timestamp', target_col='quantity', static_features=[], dropna=False)

This step worked fine, I received the lag features. I saved the dataframe for later usage.

If I want to use it during the training, it generates error. fcst.fit(training_df_featured, id_col='unique_id', time_col='timestamp', target_col='quantity', static_features=[], dropna=False)

Error message: 'Duplicate key: lag1'

Detailed traceback:

_1 _State.RUNNING -> _State.FAILED Invalid syntax: 'timestamp:datetime,quantity:double,unique_id:str,lag1:double,lag2:double,lag3:double,lag4:double,lag5:double,lag6:double,lag7:double,rolling_mean_lag1_window_size7:double,rolling_mean_lag1_window_size30:double,expanding_mean_lag1:double,rolling_mean_lag335_window_size30:double,dayofweek:double,lag1:double,lag2:double,lag3:double,lag4:double,lag5:double,lag6:double,lag7:double,rolling_mean_lag1_window_size7:double,rolling_mean_lag1_window_size30:double,expanding_mean_lag1:double,rolling_mean_lag335_window_size30:double,dayofweek:double' 'Duplicate key: lag1' SchemaError: Invalid syntax: 'timestamp:datetime,quantity:double,unique_id:str,lag1:double,lag2:double,lag3:double,lag4:double,lag5:double,lag6:double,lag7:double,rolling_mean_lag1_window_size7:double,rolling_mean_lag1_window_size30:double,expanding_mean_lag1:double,rolling_mean_lag335_window_size30:double,dayofweek:double,lag1:double,lag2:double,lag3:double,lag4:double,lag5:double,lag6:double,lag7:double,rolling_mean_lag1_window_size7:double,rolling_mean_lag1_window_size30:double,expanding_mean_lag1:double,rolling_mean_lag335_window_size30:double,dayofweek:double' 'Duplicate key: lag1' File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/triad/utils/pyarrow.py:150, in expression_to_schema(expr) 149 json_str = "{" + "".join(list(_parse_tokens(expr))[:-1]) + "}" --> 150 obj = loads_no_dup(json_str) 151 return pa.schema(_construct_struct(obj)) File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/triad/utils/json.py:12, in loads_no_dup(json_str) 6 """Load json string, and raise KeyError if there are duplicated keys 7 8 :param json_str: json string 9 :raises KeyError: if there are duplicated keys 10 :return: the parsed object 11 """ ---> 12 return json.loads(json_str, object_pairs_hook=check_for_duplicate_keys) File /usr/lib/python3.11/json/init.py:359, in loads(s, cls, object_hook, parse_float, parse_int, parse_constant, object_pairs_hook, kw) 358 kw['parse_constant'] = parse_constant --> 359 return cls(kw).decode(s) File /usr/lib/python3.11/json/decoder.py:337, in JSONDecoder.decode(self, s, _w) 333 """Return the Python representation of s (a str instance 334 containing a JSON document). 335 336 """ --> 337 obj, end = self.raw_decode(s, idx=_w(s, 0).end()) 338 end = _w(s, end).end() File /usr/lib/python3.11/json/decoder.py:353, in JSONDecoder.raw_decode(self, s, idx) 352 try: --> 353 obj, end = self.scan_once(s, idx) 354 except StopIteration as err: File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/triad/utils/json.py:30, in check_for_duplicate_keys(ordered_pairs) 29 if key in dict_out: ---> 30 raise KeyError(f"Duplicate key: {key}") 31 else: KeyError: 'Duplicate key: lag1'

During handling of the above exception, another exception occurred: SyntaxError Traceback (most recent call last) File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/triad/collections/schema.py:687, in Schema.transform(self, *args, kwargs) 686 op_pos.append(len(a)) --> 687 s = Schema( 688 safe_replace_out_of_quote(a[: op_pos[0]], "", str(self)) 689 ) 690 for i in range(0, len(op_pos) - 1): File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/triad/collections/schema.py:94, in Schema.init(self, args, kwargs) 93 if isinstance(args[0], str): ---> 94 fields = list(expression_to_schema(args[0])) 95 if isinstance(args[0], pa.Schema): File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/triad/utils/pyarrow.py:155, in expression_to_schema(expr) 154 except Exception as e: --> 155 raise SyntaxError(f"Invalid syntax: '{expr}' {str(e)}") SyntaxError: Invalid syntax: 'timestamp:datetime,quantity:double,unique_id:str,lag1:double,lag2:double,lag3:double,lag4:double,lag5:double,lag6:double,lag7:double,rolling_mean_lag1_window_size7:double,rolling_mean_lag1_window_size30:double,expanding_mean_lag1:double,rolling_mean_lag335_window_size30:double,dayofweek:double,lag1:double,lag2:double,lag3:double,lag4:double,lag5:double,lag6:double,lag7:double,rolling_mean_lag1_window_size7:double,rolling_mean_lag1_window_size30:double,expanding_mean_lag1:double,rolling_mean_lag335_window_size30:double,dayofweek:double' 'Duplicate key: lag1'

During handling of the above exception, another exception occurred: SchemaError Traceback (most recent call last) File , line 1 ----> 1 fcst.fit(training_df_featured, id_col='unique_id', time_col='timestamp', target_col='quantity', static_features=[], dropna=False) File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/mlforecast/distributed/forecast.py:436, in DistributedMLForecast.fit(self, df, id_col, time_col, target_col, static_features, dropna, keep_last_n) 402 def fit( 403 self, 404 df: fugue.AnyDataFrame, (...) 410 keep_last_n: Optional[int] = None, 411 ) -> "DistributedMLForecast": 412 """Apply the feature engineering and train the models. 413 414 Parameters (...) 434 Forecast object with series values and trained models. 435 """ --> 436 return self._fit( 437 df, 438 id_col=id_col, 439 time_col=time_col, 440 target_col=target_col, 441 static_features=static_features, 442 dropna=dropna, 443 keep_last_n=keep_last_n, 444 ) File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/mlforecast/distributed/forecast.py:361, in DistributedMLForecast._fit(self, data, id_col, time_col, target_col, static_features, dropna, keep_last_n, window_info) 350 def _fit( 351 self, 352 data: fugue.AnyDataFrame, (...) 359 window_info: Optional[WindowInfo] = None, 360 ) -> "DistributedMLForecast": --> 361 prep = self._preprocess( 362 data, 363 id_col=id_col, 364 time_col=time_col, 365 target_col=target_col, 366 static_features=static_features, 367 dropna=dropna, 368 keep_last_n=keep_last_n, 369 window_info=window_info, 370 ) 371 features = [ 372 x 373 for x in fa.get_column_names(prep) 374 if x not in {id_col, time_col, targetcol} 375 ] 376 self.models = {} File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/mlforecast/distributed/forecast.py:298, in DistributedMLForecast._preprocess(self, data, id_col, time_col, target_col, static_features, dropna, keep_last_n, window_info) 296 base_schema = str(fa.get_schema(data)) 297 features_schema = ",".join(f"{feat}:double" for feat in self._base_ts.features) --> 298 res = fa.transform( 299 self._partition_results, 300 DistributedMLForecast._retrieve_df, 301 schema=f"{base_schema},{features_schema}", 302 engine=self.engine, 303 ) 304 return fa.get_native_as_df(res) File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/fugue/workflow/api.py:174, in transform(df, using, schema, params, partition, callback, ignore_errors, persist, as_local, save_path, checkpoint, engine, engine_conf, as_fugue) 171 else: 172 tdf.save(save_path, fmt="parquet") --> 174 dag.run(make_execution_engine(engine, conf=engine_conf, infer_by=[df])) 175 if checkpoint: 176 result = dag.yields["result"].result # type:ignore File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/fugue/workflow/workflow.py:1604, in FugueWorkflow.run(self, engine, conf, *kwargs) 1602 if ctb is None: # pragma: no cover 1603 raise -> 1604 raise ex.with_traceback(ctb) 1605 self._computed = True 1606 return FugueWorkflowResult(self.yields) File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/triad/collections/schema.py:709, in Schema.transform(self, args, **kwargs) 707 raise 708 except Exception as e: --> 709 raise SchemaError(e)

jmoralez commented 1 month ago

Hey @expertise-team, thanks for the example. I've fixed that in the main branch and it'll be in the next release, or you can install from github in the meantime.

expertise-team commented 1 month ago

Hi @jmoralez, thank you for the quick fix. I will test it.