cross_validation of DistributedMLForecast not working when n_windows > 2 #251

Closed wregter closed 10 months ago

wregter commented 10 months ago

What happened + What you expected to happen

Thank you for developing this amazing python library!

My issue:

Using cross_validation with DistributedMLForecast does not work for me when n_windows > 2. This produces a schema error. I suspect that something is going wrong when merging the result dataframes when there are more than 2 results. In the reproduction script I use a dataset that should have more than enough observations to do the cross-validation.

The error I get is the following:

SchemaError: Schema can't be empty

SchemaError Traceback (most recent call last) File , line 10 6 series_spark = spark.createDataFrame(series).repartitionByRange(20, "unique_id") 8 fcst = DistributedMLForecast(models=[SparkLGBMForecast()], freq="D", lags=[1]) ---> 10 cv_results = fcst.cross_validation(series_spark, n_windows=3, h=5)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c61b7c9a-05b7-4802-a72b-408338297e70/lib/python3.10/site-packages/mlforecast/, in old_kw_to_pos..decorator..inner(*args, *kwargs) 162 new_args.append(kwargs.pop(arg_names[i])) 163 new_args.append(kwargs.pop(old_name)) --> 164 return f(new_args, **kwargs)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c61b7c9a-05b7-4802-a72b-408338297e70/lib/python3.10/site-packages/mlforecast/distributed/, in DistributedMLForecast.cross_validation(self, df, n_windows, h, id_col, time_col, target_col, step_size, static_features, dropna, keep_last_n, refit, before_predict_callback, after_predict_callback, input_size, data, window_size) 651 if len(results) == 2: 652 return fa.union(results[0], results[1]) --> 653 return fa.union(results[0], results[1], results[2:])

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c61b7c9a-05b7-4802-a72b-408338297e70/lib/python3.10/site-packages/fugue/execution/, in union(df1, df2, distinct, engine, engine_conf, as_fugue, as_local, dfs) 866 res = e.union(res, as_fugue_engine_df(e, odf), distinct=distinct) 867 return res --> 869 return run_engine_function( 870 _union, 871 engine=engine, 872 engine_conf=engine_conf, 873 as_fugue=as_fugue, 874 as_local=as_local, 875 infer_by=[df1, df2, dfs], 876 )

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c61b7c9a-05b7-4802-a72b-408338297e70/lib/python3.10/site-packages/fugue/execution/, in run_engine_function(func, engine, engine_conf, as_fugue, as_local, infer_by) 153 """Run a lambda function based on the engine provided 154 155 :param engine: an engine like object, defaults to None (...) 169 This function is for deveopment use. Users should not need it. 170 """ 171 with engine_context(engine, engine_conf=engine_conf, infer_by=infer_by) as e: --> 172 res = func(e) 174 if isinstance(res, DataFrame): 175 res = e.convert_yield_dataframe(res, as_local=as_local)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c61b7c9a-05b7-4802-a72b-408338297e70/lib/python3.10/site-packages/fugue/execution/, in union.._union(e) 864 res = e.union(edf1, edf2, distinct=distinct) 865 for odf in dfs: --> 866 res = e.union(res, as_fugue_engine_df(e, odf), distinct=distinct) 867 return res

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c61b7c9a-05b7-4802-a72b-408338297e70/lib/python3.10/site-packages/triad/utils/, in, *args, kwds) 110 def call(self, *args: Any, *kwds: Any) -> Any: --> 111 return self.run_top(args, kwds)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c61b7c9a-05b7-4802-a72b-408338297e70/lib/python3.10/site-packages/triad/utils/, in ConditionalDispatcher.run_top(self, *args, kwargs) 263 def run_top(self, *args: Any, *kwargs: Any) -> Any: 264 """Execute the first matching child function 265 266 :return: the return of the child function 267 """ --> 268 return list(itertools.islice(, kwargs), 1))[0]

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c61b7c9a-05b7-4802-a72b-408338297e70/lib/python3.10/site-packages/triad/utils/, in, *args, *kwargs) 259 has_return = True 260 if not has_return: --> 261 yield self._func(args, **kwargs)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c61b7c9a-05b7-4802-a72b-408338297e70/lib/python3.10/site-packages/fugue/execution/, in as_fugue_engine_df(engine, df, schema) 128 """Convert a dataframe to a Fugue engine dependent DataFrame. 129 This function is used internally by Fugue. It is not recommended 130 to use (...) 136 :return: the engine dependent DataFrame 137 """ 138 if schema is None: --> 139 fdf = as_fugue_df(df) 140 else: 141 fdf = as_fugue_df(df, schema=schema)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c61b7c9a-05b7-4802-a72b-408338297e70/lib/python3.10/site-packages/fugue/dataframe/, in as_fugue_df(df, kwargs) 459 def as_fugue_df(df: AnyDataFrame, kwargs: Any) -> DataFrame: 460 """Wrap the object as a Fugue DataFrame. 461 462 :param df: the object to wrap 463 """ --> 464 ds = as_fugue_dataset(df, **kwargs) 465 if isinstance(ds, DataFrame): 466 return ds

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c61b7c9a-05b7-4802-a72b-408338297e70/lib/python3.10/site-packages/triad/utils/, in, *args, kwds) 110 def call(self, *args: Any, *kwds: Any) -> Any: --> 111 return self.run_top(args, kwds)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c61b7c9a-05b7-4802-a72b-408338297e70/lib/python3.10/site-packages/triad/utils/, in ConditionalDispatcher.run_top(self, *args, kwargs) 263 def run_top(self, *args: Any, *kwargs: Any) -> Any: 264 """Execute the first matching child function 265 266 :return: the return of the child function 267 """ --> 268 return list(itertools.islice(, kwargs), 1))[0]

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c61b7c9a-05b7-4802-a72b-408338297e70/lib/python3.10/site-packages/triad/utils/, in, *args, kwargs) 256 for f in self._funcs: 257 if self._match(f[2], *args, *kwargs): --> 258 yield f[3](args, kwargs) 259 has_return = True 260 if not has_return:

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c61b7c9a-05b7-4802-a72b-408338297e70/lib/python3.10/site-packages/fugue/dataframe/, in _arr_to_fugue(df, kwargs) 126 @as_fugue_dataset.candidate(lambda df, kwargs: isinstance(df, list), priority=0.9) 127 def _arr_to_fugue(df: List[Any], kwargs: Any) -> ArrayDataFrame: --> 128 return ArrayDataFrame(df, kwargs)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c61b7c9a-05b7-4802-a72b-408338297e70/lib/python3.10/site-packages/fugue/dataframe/, in ArrayDataFrame.init(self, df, schema) 39 self._native = df.as_array(schema.names, type_safe=False) 40 elif isinstance(df, Iterable): ---> 41 super().init(schema) 42 self._native = df if isinstance(df, List) else list(df) 43 else:

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c61b7c9a-05b7-4802-a72b-408338297e70/lib/python3.10/site-packages/fugue/dataframe/, in DataFrame.init(self, schema) 42 super().init() 43 if not callable(schema): ---> 44 schema = _input_schema(schema).assert_not_empty() 45 schema.set_readonly() 46 self._schema: Union[Schema, Callable[[], Schema]] = schema

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c61b7c9a-05b7-4802-a72b-408338297e70/lib/python3.10/site-packages/triad/collections/, in Schema.assert_not_empty(self) 150 if len(self) > 0: 151 return self --> 152 raise SchemaError("Schema can't be empty")

SchemaError: Schema can't be empty

Versions / Dependencies

Package Version

Reproduction script

from mlforecast.utils import generate_daily_series
from mlforecast.distributed import DistributedMLForecast
from mlforecast.distributed.models.spark.lgb import SparkLGBMForecast

series = generate_daily_series(10, equal_ends=True)
series_spark = spark.createDataFrame(series).repartitionByRange(20, "unique_id")

fcst = DistributedMLForecast(models=[SparkLGBMForecast()], freq="D", lags=[1])

cv_results = fcst.cross_validation(series_spark, n_windows=3, h=5)

Issue Severity

Medium: It is a significant difficulty but I can work around it.