Nixtla / mlforecast

Scalable machine 🤖 learning for time series forecasting.
https://nixtlaverse.nixtla.io/mlforecast
Apache License 2.0
860 stars 84 forks source link

[Model] Distributed version of the model giving Arrow Capacity error #295

Closed iamyihwa closed 7 months ago

iamyihwa commented 9 months ago

What happened + What you expected to happen

  1. I am training a large dataset with the spark distributed version of the model, and am getting an pyarrow error that says 'PythonException: An exception was thrown from a UDF: 'pyarrow.lib.ArrowCapacityError: array cannot contain more than 2147483646 bytes, have 2745777856'.'

I wonder if it is due to the target transformation routine, that is using pandas, or distributed mlforecast uses pyarrow underneath, and there is a limitation in the size.

  1. Additionally it is saying that it is training with only 1 worker, when in theory there are 8 workers available.
2024-01-09 07:27:30,244 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 1 workers with
    booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'nthread': 1}
    train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
    dmatrix_kwargs: {'nthread': 1, 'missing': nan}
  1. Here is the error message.
    
    ---------------------------------------------------------------------------
    PythonException                           Traceback (most recent call last)
    <command-820427589813878> in <module>
     11  #   date_features=[week_of_month],
     12 )
    ---> 13 fcst.fit(
     14     spark_series,
     15   #  static_features=['embedding_x', 'embedding_y'],

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a70a3c46-5e29-436c-8d76-bed107dbdff0/lib/python3.8/site-packages/mlforecast/distributed/forecast.py in fit(self, df, id_col, time_col, target_col, static_features, dropna, keep_last_n) 412 Forecast object with series values and trained models. 413 """ --> 414 return self._fit( 415 df, 416 id_col=id_col,

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a70a3c46-5e29-436c-8d76-bed107dbdff0/lib/python3.8/site-packages/mlforecast/distributed/forecast.py in _fit(self, data, id_col, time_col, target_col, static_features, dropna, keep_last_n, window_info) 357 train_data = featurizer.transform(prep)[target_col, "features"] 358 for name, model in self.models.items(): --> 359 trained_model = model._pre_fit(target_col).fit(traindata) 360 self.models[name] = model.extract_local_model(trained_model) 361 elif DASK_INSTALLED and isinstance(data, dd.DataFrame):

/databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py in patched_method(self, *args, *kwargs) 28 call_succeeded = False 29 try: ---> 30 result = original_method(self, args, **kwargs) 31 call_succeeded = True 32 return result

/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params) 159 return self.copy(params)._fit(dataset) 160 else: --> 161 return self._fit(dataset) 162 else: 163 raise TypeError("Params must be either a param map or a list/tuple of param maps, "

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a70a3c46-5e29-436c-8d76-bed107dbdff0/lib/python3.8/site-packages/xgboost/spark/core.py in _fit(self, dataset) 1134 dmatrix_kwargs, 1135 ) -> 1136 (config, booster) = _run_job() 1137 get_logger("XGBoost-PySpark").info("Finished xgboost training!") 1138

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a70a3c46-5e29-436c-8d76-bed107dbdff0/lib/python3.8/site-packages/xgboost/spark/core.py in _run_job() 1112 def _run_job() -> Tuple[str, str]: 1113 rdd = ( -> 1114 dataset.mapInPandas( 1115 _train_booster, # type: ignore 1116 schema="config string, booster string",

/databricks/spark/python/pyspark/sql/dataframe.py in rdd(self) 88 """ 89 if self._lazy_rdd is None: ---> 90 jrdd = self._jdf.javaToPython() 91 self._lazy_rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer())) 92 return self._lazy_rdd

/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in call(self, *args) 1302 1303 answer = self.gateway_client.send_command(command) -> 1304 return_value = get_return_value( 1305 answer, self.gateway_client, self.target_id, self.name) 1306

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 121 # Hide where the exception came from that shows a non-Pythonic 122 # JVM exception message. --> 123 raise converted from None 124 else: 125 raise

PythonException: An exception was thrown from a UDF: 'pyarrow.lib.ArrowCapacityError: array cannot contain more than 2147483646 bytes, have 2745777856'. Full traceback below: Traceback (most recent call last): File "pyarrow/array.pxi", line 1044, in pyarrow.lib.Array.from_pandas File "pyarrow/array.pxi", line 316, in pyarrow.lib.array File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array File "pyarrow/error.pxi", line 125, in pyarrow.lib.check_status pyarrow.lib.ArrowCapacityError: array cannot contain more than 2147483646 bytes, have 2745777856


Thank you in advance!!! 

### Versions / Dependencies

0.11.5 

### Reproduction script
```python
spark = (
    SparkSession
    .builder    
    .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:1.0.1")    #  "com.microsoft.azure:synapseml_2.12:0.10.2") com.microsoft.azure:synapseml_2.12:0.11.3
    .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")
    .getOrCreate()
)

numPartitions = 8
spark_series = result_df.repartitionByRange(numPartitions, 'unique_id')

from mlforecast.distributed.models.spark.xgb import SparkXGBForecast
from mlforecast.distributed import DistributedMLForecast
from window_ops.expanding import expanding_mean
import pandas as pd
from window_ops.rolling import  rolling_mean
from mlforecast.target_transforms import BaseTargetTransform
from mlforecast.target_transforms import Differences

class Mean_Scaler(BaseTargetTransform):
    """Scales each serie to be normalized by its mean + 1."""
    ## Target transform DeepAR Approach https://arxiv.org/pdf/1704.04110.pdf , target transform  https://nixtla.github.io/mlforecast/docs/how-to-guides/target_transforms_guide.html
    def fit_transform(self, df: pd.DataFrame) -> pd.DataFrame:
        self.stats_ = df.groupby(self.id_col)[self.target_col].agg(['mean'])
        df = df.merge(self.stats_, on=self.id_col)
        df[self.target_col] = (df[self.target_col]) / (df['mean'] + 1)
        df = df.drop(columns=['mean'])
        return df

    def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.merge(self.stats_, on=self.id_col)
        for col in df.columns.drop([self.id_col, self.time_col, 'mean']):
            df[col] = df[col] * (df['mean'] +1) 
        df = df.drop(columns=['mean'])
        return df

models = [ SparkXGBForecast() ] # SparkXGBRegressor()]  # ,
fcst = DistributedMLForecast(
    models,
    freq='W-SAT',
    lags=[1, 52],
    lag_transforms={
        1: [expanding_mean], 
        4: [(rolling_mean, 4 )], 
    } ,
    target_transforms = [Differences([1]), Mean_Scaler()], 

)
fcst.fit(
    spark_series,
 )

Issue Severity

None

iamyihwa commented 9 months ago

With regards to number of workers, by setting num_workers parameter to the number of workers I had worked! (xgboost documentation). models = [ SparkXGBForecast(num_workers = 8) ]

jmoralez commented 9 months ago

Hey @iamyihwa, thanks for the great report. Do you have very long ids? This answer suggests that it may be an issue with large strings. This seems to be coming from the fit step, so I don't think it's related to your transformation. Increasing the number of partitions may help.

iamyihwa commented 9 months ago

Thanks @jmoralez ! It worked!!

As you suggested increased the number of partitions , and i didn't have that error anymore!

Now the training is done seamlessly and very fast, however when getting the forecasted results it takes very long time. (Training took 10 minutes with 8 workers, however when i am getting a glimpse of forecasted results (.take()) it is taking more than 10 minutes, and still counting .. )

horizon = 52 
preds_sf = fcst.predict(h = horizon)
preds_sf.take(5)

What would be the best way to train very large dataset in mlforecast/ neuralforecast @jmoralez ?? Sorry it is a bit different topic from the title, but would love to hear your input on this!

jmoralez commented 9 months ago

The lag transformations can take a long time if you have very long series. Can you try using the built-in ones? They should be significantly faster and also support multithreading, so try also setting num_threads to the number of cores in your machine in the forecast constructor.

Also I don't think spark is able to know it can take the first five rows from a single partition, so you can try saving the result first and then getting a subset, otherwise the whole computation will run only to return 5 rows.

jmoralez commented 8 months ago

301 should also make the predict step faster for distributed.

github-actions[bot] commented 7 months ago

This issue has been automatically closed because it has been awaiting a response for too long. When you have time to to work with the maintainers to resolve this issue, please post a new comment and it will be re-opened. If the issue has been locked for editing by the time you return to it, please open a new issue and reference this one.