awslabs / gluonts

Probabilistic time series modeling in Python
https://ts.gluon.ai
Apache License 2.0
4.55k stars 747 forks source link

Exception: Reached maximum number of idle transformation calls #2694

Closed tjb-tech closed 1 year ago

tjb-tech commented 1 year ago

Description

Hi, I am using your lib to model Probabilistic Time Series forecasting problem. Specifically, I used "electricity_nips" dataset with get_dataset function. However an Exception occured when I feed dataset_train into estimator.train function. Detailed descriptions are as follows:

To Reproduce

dataset = get_dataset("electricity_nips", regenerate=False)

train_grouper = MultivariateGrouper(max_target_dim=min(2000, int(dataset.metadata.feat_static_cat[0].cardinality)))

test_grouper = MultivariateGrouper(num_test_dates=int(len(dataset.test) / len(dataset.train)),
                                   max_target_dim=min(2000, int(dataset.metadata.feat_static_cat[0].cardinality)))

dataset_train = train_grouper(dataset.train)
dataset_test = test_grouper(dataset.test)

estimator = TimeGradEstimator(
    target_dim=int(dataset.metadata.feat_static_cat[0].cardinality),
    prediction_length=dataset.metadata.prediction_length,
    context_length=dataset.metadata.prediction_length,
    cell_type='GRU',
    input_size=1484,
    freq=dataset.metadata.freq,
    loss_type='l2',
    scaling=True,
    diff_steps=100,
    beta_end=0.1,
    beta_schedule="linear",
    trainer=Trainer(device=device,
                    epochs=20,
                    learning_rate=1e-3,
                    num_batches_per_epoch=100,
                    batch_size=64, )
)

predictor = estimator.train(dataset_train, num_workers=8)

Error message or code output

The errors occured in predictor = estimator.train(dataset_train, num_workers=8), which is demonstrated as below:

Traceback (most recent call last):
  File "/root/miniconda3/envs/crime/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/root/miniconda3/envs/crime/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/root/.vscode-server/extensions/ms-python.python-2023.2.0/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher/../../debugpy/__main__.py", line 39, in <module>
    cli.main()
  File "/root/.vscode-server/extensions/ms-python.python-2023.2.0/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher/../../debugpy/../debugpy/server/cli.py", line 430, in main
    run()
  File "/root/.vscode-server/extensions/ms-python.python-2023.2.0/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher/../../debugpy/../debugpy/server/cli.py", line 284, in run_file
    runpy.run_path(target, run_name="__main__")
  File "/root/.vscode-server/extensions/ms-python.python-2023.2.0/pythonFiles/lib/python/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py", line 321, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "/root/.vscode-server/extensions/ms-python.python-2023.2.0/pythonFiles/lib/python/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py", line 135, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "/root/.vscode-server/extensions/ms-python.python-2023.2.0/pythonFiles/lib/python/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py", line 124, in _run_code
    exec(code, run_globals)
  File "/root/autodl-nas/TimeGrad/main.py", line 110, in <module>
    predictor = estimator.train(dataset_train)
  File "/root/autodl-nas/TimeGrad/pts/model/estimator.py", line 179, in train
    return self.train_model(
  File "/root/autodl-nas/TimeGrad/pts/model/estimator.py", line 151, in train_model
    self.trainer(
  File "/root/autodl-nas/TimeGrad/pts/trainer.py", line 63, in __call__
    for batch_no, data_entry in enumerate(it, start=1):
  File "/root/miniconda3/envs/crime/lib/python3.8/site-packages/tqdm/std.py", line 1185, in __iter__
    for obj in iterable:
  File "/root/miniconda3/envs/crime/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 521, in __next__
    data = self._next_data()
  File "/root/miniconda3/envs/crime/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 561, in _next_data
    data = self._dataset_fetcher.fetch(index)  # may raise StopIteration
  File "/root/miniconda3/envs/crime/lib/python3.8/site-packages/torch/utils/data/_utils/fetch.py", line 28, in fetch
    data.append(next(self.dataset_iter))
  File "/root/miniconda3/envs/crime/lib/python3.8/site-packages/gluonts/transform/_base.py", line 103, in __iter__
    yield from self.transformation(
  File "/root/miniconda3/envs/crime/lib/python3.8/site-packages/gluonts/transform/_base.py", line 124, in __call__
    for data_entry in data_it:
  File "/root/miniconda3/envs/crime/lib/python3.8/site-packages/gluonts/transform/_base.py", line 124, in __call__
    for data_entry in data_it:
  File "/root/miniconda3/envs/crime/lib/python3.8/site-packages/gluonts/transform/_base.py", line 189, in __call__
    raise Exception(
Exception: Reached maximum number of idle transformation calls.
This means the transformation looped over 1 inputs without returning any output.
This occurred in the following transformation:
gluonts.transform.split.InstanceSplitter(dummy_value=0.0, forecast_start_field='forecast_start', future_length=24, instance_sampler=ExpectedNumInstanceSampler(axis=-1, min_past=192, min_future=24, num_instances=1.0, total_length=54352, n=8), is_pad_field='is_pad', lead_time=0, output_NTC=True, past_length=192, start_field='start', target_field='target', time_series_fields=['time_feat', 'observed_values'])

Environment

(Add as much information about your environment as possible, e.g. dependencies versions.)

jaheba commented 1 year ago

Hi, it looks like you are using a model from pts, and not GluonTS.

I'm not sure how things interplay here.

But the basic issue is that we have a safety check on some transformation to prevent an idling infinite loop. It uses a counter and breaks when there were that many iterations without a result. In your case that value appears to be 1 which is very low :).

You can try this as a workaround:

from gluonts.env import env

 env._set("max_idle_transforms", 100)
tjb-tech commented 1 year ago

Hi, it looks like you are using a model from pts, and not GluonTS.

I'm not sure how things interplay here.

But the basic issue is that we have a safety check on some transformation to prevent an idling infinite loop. It uses a counter and breaks when there were that many iterations without a result. In your case that value appears to be 1 which is very low :).

You can try this as a workaround:

from gluonts.env import env

 env._set("max_idle_transforms", 100)

I add your code into my code but the same errors occur. Actually I use the customized model based on GluonTS, and the model is put in pts. May I ask where the errors might be? Thx a lot! And I can show my Estimator as follows

class TimeGradEstimator(PyTorchEstimator):
    def __init__(
        self,
        input_size: int,
        freq: str,
        prediction_length: int,
        target_dim: int,
        trainer: Trainer = Trainer(),
        context_length: Optional[int] = None,
        num_layers: int = 2,
        num_cells: int = 40,
        cell_type: str = "LSTM",
        num_parallel_samples: int = 100,
        dropout_rate: float = 0.1,
        cardinality: List[int] = [1],
        embedding_dimension: int = 5,
        conditioning_length: int = 100,
        diff_steps: int = 100,
        loss_type: str = "l2",
        beta_end=0.1,
        beta_schedule="linear",
        residual_layers=8,
        residual_channels=8,
        dilation_cycle_length=2,
        scaling: bool = True,
        pick_incomplete: bool = False,
        lags_seq: Optional[List[int]] = None,
        time_features: Optional[List[TimeFeature]] = None,
        **kwargs,
    ) -> None:
        super().__init__(trainer=trainer, **kwargs)

        self.freq = freq
        self.context_length = (
            context_length if context_length is not None else prediction_length
        )

        self.input_size = input_size
        self.prediction_length = prediction_length
        self.target_dim = target_dim
        self.num_layers = num_layers
        self.num_cells = num_cells
        self.cell_type = cell_type
        self.num_parallel_samples = num_parallel_samples
        self.dropout_rate = dropout_rate
        self.cardinality = cardinality
        self.embedding_dimension = embedding_dimension

        self.conditioning_length = conditioning_length
        self.diff_steps = diff_steps
        self.loss_type = loss_type
        self.beta_end = beta_end
        self.beta_schedule = beta_schedule
        self.residual_layers = residual_layers
        self.residual_channels = residual_channels
        self.dilation_cycle_length = dilation_cycle_length

        self.lags_seq = (
            lags_seq
            if lags_seq is not None
            else lags_for_fourier_time_features_from_frequency(freq_str=freq)
        )

        self.time_features = (
            time_features
            if time_features is not None
            else fourier_time_features_from_frequency(self.freq)
        )

        self.history_length = self.context_length + max(self.lags_seq)
        self.pick_incomplete = pick_incomplete
        self.scaling = scaling

        self.train_sampler = ExpectedNumInstanceSampler(
            num_instances=1.0,
            min_past=0 if pick_incomplete else self.history_length,
            min_future=prediction_length,
        )

        self.validation_sampler = ValidationSplitSampler(
            min_past=0 if pick_incomplete else self.history_length,
            min_future=prediction_length,
        )

    def create_transformation(self) -> Transformation:
        return Chain(
            [
                AsNumpyArray(
                    field=FieldName.TARGET,
                    expected_ndim=2,
                ),
                # maps the target to (1, T)
                # if the target data is uni dimensional
                ExpandDimArray(
                    field=FieldName.TARGET,
                    axis=None,
                ),
                AddObservedValuesIndicator(
                    target_field=FieldName.TARGET,
                    output_field=FieldName.OBSERVED_VALUES,
                ),
                AddTimeFeatures(
                    start_field=FieldName.START,
                    target_field=FieldName.TARGET,
                    output_field=FieldName.FEAT_TIME,
                    time_features=self.time_features,
                    pred_length=self.prediction_length,
                ),
                VstackFeatures(
                    output_field=FieldName.FEAT_TIME,
                    input_fields=[FieldName.FEAT_TIME],
                ),
                SetFieldIfNotPresent(field=FieldName.FEAT_STATIC_CAT, value=[0]),
                TargetDimIndicator(
                    field_name="target_dimension_indicator",
                    target_field=FieldName.TARGET,
                ),
                AsNumpyArray(field=FieldName.FEAT_STATIC_CAT, expected_ndim=1),
            ]
        )

    def create_instance_splitter(self, mode: str):
        assert mode in ["training", "validation", "test"]

        instance_sampler = {
            "training": self.train_sampler,
            "validation": self.validation_sampler,
            "test": TestSplitSampler(),
        }[mode]

        return InstanceSplitter(
            target_field=FieldName.TARGET,
            is_pad_field=FieldName.IS_PAD,
            start_field=FieldName.START,
            forecast_start_field=FieldName.FORECAST_START,
            instance_sampler=instance_sampler,
            past_length=self.history_length,
            future_length=self.prediction_length,
            time_series_fields=[
                FieldName.FEAT_TIME,
                FieldName.OBSERVED_VALUES,
            ],
        ) + (
            RenameFields(
                {
                    f"past_{FieldName.TARGET}": f"past_{FieldName.TARGET}_cdf",
                    f"future_{FieldName.TARGET}": f"future_{FieldName.TARGET}_cdf",
                }
            )
        )

    def create_training_network(self, device: torch.device) -> TimeGradTrainingNetwork:
        return TimeGradTrainingNetwork(
            input_size=self.input_size,
            target_dim=self.target_dim,
            num_layers=self.num_layers,
            num_cells=self.num_cells,
            cell_type=self.cell_type,
            history_length=self.history_length,
            context_length=self.context_length,
            prediction_length=self.prediction_length,
            dropout_rate=self.dropout_rate,
            cardinality=self.cardinality,
            embedding_dimension=self.embedding_dimension,
            diff_steps=self.diff_steps,
            loss_type=self.loss_type,
            beta_end=self.beta_end,
            beta_schedule=self.beta_schedule,
            residual_layers=self.residual_layers,
            residual_channels=self.residual_channels,
            dilation_cycle_length=self.dilation_cycle_length,
            lags_seq=self.lags_seq,
            scaling=self.scaling,
            conditioning_length=self.conditioning_length,
        ).to(device)

    def create_predictor(
        self,
        transformation: Transformation,
        trained_network: TimeGradTrainingNetwork,
        device: torch.device,
    ) -> Predictor:
        prediction_network = TimeGradPredictionNetwork(
            input_size=self.input_size,
            target_dim=self.target_dim,
            num_layers=self.num_layers,
            num_cells=self.num_cells,
            cell_type=self.cell_type,
            history_length=self.history_length,
            context_length=self.context_length,
            prediction_length=self.prediction_length,
            dropout_rate=self.dropout_rate,
            cardinality=self.cardinality,
            embedding_dimension=self.embedding_dimension,
            diff_steps=self.diff_steps,
            loss_type=self.loss_type,
            beta_end=self.beta_end,
            beta_schedule=self.beta_schedule,
            residual_layers=self.residual_layers,
            residual_channels=self.residual_channels,
            dilation_cycle_length=self.dilation_cycle_length,
            lags_seq=self.lags_seq,
            scaling=self.scaling,
            conditioning_length=self.conditioning_length,
            num_parallel_samples=self.num_parallel_samples,
        ).to(device)

        copy_parameters(trained_network, prediction_network)
        input_names = get_module_forward_input_names(prediction_network)
        prediction_splitter = self.create_instance_splitter("test")

        return PyTorchPredictor(
            input_transform=transformation + prediction_splitter,
            input_names=input_names,
            prediction_net=prediction_network,
            batch_size=self.trainer.batch_size,
            prediction_length=self.prediction_length,
            device=device,
        )
jaheba commented 1 year ago

The offending class is InstanceSplitter, as you can see from the last line of your logs:

gluonts.transform.split.InstanceSplitter(dummy_value=0.0, forecast_start_field='forecast_start', future_length=24, instance_sampler=ExpectedNumInstanceSampler(axis=-1, min_past=192, min_future=24, num_instances=1.0, total_length=54352, n=8), is_pad_field='is_pad', lead_time=0, output_NTC=True, past_length=192, start_field='start', target_field='target', time_series_fields=['time_feat', 'observed_values'])

The problem appears to be here:

https://github.com/zalandoresearch/pytorch-ts/blob/81be06bcc128729ad8901fcf1c722834f176ac34/pts/model/estimator.py#L107

        with env._let(max_idle_transforms=maybe_len(training_data) or 0):

Here, it sets max_idle_transforms to 1 in your case.

There is not much we can do, since that is not our library.

However, you could artificially make your input dataset bigger by doing:

dataset_train = list(train_grouper(dataset.train))
dataset_train *= 100 
tjb-tech commented 1 year ago

Amazing! After adding your code, the errors disappear. But I stilll can't understand the reason why the error occur and is there any difference in training data after adding your code? Could you please explain in detail for me or give me some materials or directions? Please forgive me that I am a freshman for GluonTS.

jaheba commented 1 year ago

The base problem is that for training we construct an endless stream of time series by iterating over the input data again and again. More or less this:

def training_iter(dataset):
    while True:
        for entry in dataset:
            yield entry

Then we have transformations that we apply to that stream of entries. However, some of these transformations (e.g. InstanceSplitter) don't map input to output 1 to 1, but can also yield more or less time series.

For example, you could have a transformation step that filters too short time series based on a threshold. But if the threshold is too high, it filters out all time series and we are stuck with an infinite loop. The same can happen for instance splitting when the input time series is shorter than the prediction length.

To mitigate this, there is max_idle_transforms which counts how many invocations of a transformation step didn't yield a result. If it exceeds that value, you get above exception.

In GluonTS we set that value to the length of the dataset, but not lower than 100. In pts it is essentially just the length of the dataset, in your case 1. So as soon as the instance splitter skips one time series, things break.

The trick here is to just make the input dataset artificially larger, to force the value to be 100.

If your model was using the base estimator defined in GluonTS, you wouldn't have the issue.

tjb-tech commented 1 year ago

Ok, thank you for the patient explanations! Although I still can't fully understand, I know that the dataset is extended by simply repeating, Right? I will carefully check its influence for my code and probably further read official doc of gluonts.

jaheba commented 1 year ago

Yes, repeating in your case is a good workaround.

But the root of this issue is pts, so I'm closing this one.