unit8co / darts

A python library for user-friendly forecasting and anomaly detection on time series.
https://unit8co.github.io/darts/
Apache License 2.0
7.56k stars 829 forks source link

[BUG] TypeError then predicting in multi-gpu scenario #2265

Open nejox opened 4 months ago

nejox commented 4 months ago

Describe the bug After training a TFT with ddp_spawnstrategy on multiple gpus in Amazon SageMaker the returned prediction of the trainer is None, leading to an TypeError: 'NoneType' object is not iterable in torch_forecasting_model.

To Reproduce Code to reproduce the problem. I tried it locally with "accelerator":"cpu"also (don't know if this is even a valid approach) and end up with the same error.

import torch
from darts.models import TFTModel
from darts.datasets import AirPassengersDataset

if __name__ == "__main__":
    torch.multiprocessing.freeze_support()
    series = AirPassengersDataset().load()
    series = [series] * 10

    model = TFTModel(
        input_chunk_length=12,
        output_chunk_length=6,
        add_relative_index=True,
        pl_trainer_kwargs={
            "accelerator": "gpu",
            "strategy": "ddp_spawn",
            "devices": "2"
           }
    )
    model.fit(series, epochs=10)
    preds = model.predict(n=6, series=series, num_samples=100)
    print("len(preds)", len(preds), "len(series)", len(series))

=> leads to:

Traceback (most recent call last):
  File ".../src/test_multigpu.py", line 21, in <module>
    preds = model.predict(n=6, series=series, num_samples=100)
  File ".../darts/models/forecasting/torch_forecasting_model.py", line 2784, in predict
    return super().predict(
  File ".../darts/utils/torch.py", line 112, in decorator
    return decorated(self, *args, **kwargs)
  File ".../darts/models/forecasting/torch_forecasting_model.py", line 1371, in predict
    predictions = self.predict_from_dataset(
  File ".../darts/utils/torch.py", line 112, in decorator
    return decorated(self, *args, **kwargs)
  File ".../darts/models/forecasting/torch_forecasting_model.py", line 1519, in predict_from_dataset
    return [ts for batch in predictions for ts in batch]
TypeError: 'NoneType' object is not iterable

Expected behavior Successfully predict values, does not need to be multi gpu prediction as I only need the speed up for training.

System (please complete the following information):

Additional context Lightning documentation recommends ddp and not to use ddp_spawn but darts only supports ddp_spawn, right? I couldn't get ddp running as i had problems with multiple executions of my script due to 1 process per gpu and thus multiple checkpoints created.

BohdanBilonoh commented 4 months ago

Faced the same issue. Found that distributed Lighting inference requires BasePredictionWriter https://lightning.ai/docs/pytorch/stable/deploy/production_basic.html#enable-distributed-inference

dennisbader commented 4 months ago

@BohdanBilonoh, would you mind sharing your use of the BasePredictionWriter to help other users that face the same issue? Would also be interesting to see, in case it is something that we can add to Darts :)

BohdanBilonoh commented 4 months ago

@dennisbader sure. As a hot fix I changed:

TorchForecastingModel.predict

Original

return predictions[0] if called_with_single_series else predictions

New code

if predictions:
    return predictions[0] if called_with_single_series else predictions
else:
    return None

TorchForecastingModel.predict_from_dataset

Original

return [ts for batch in predictions for ts in batch]

New code

if predictions:
    return [ts for batch in predictions for ts in batch]
else:
    return None

After that something like this can be added to trainer callbacks

# or you can set `write_interval="batch"` and override `write_on_batch_end` to save
# predictions at batch level
class CustomWriter(BasePredictionWriter):
    def __init__(self, output_dir, write_interval):
        super().__init__(write_interval)
        self.output_dir = output_dir

    def write_on_epoch_end(self, trainer, pl_module, predictions, batch_indices):
        # this will create N (num processes) files in `output_dir` each containing
        # the predictions of it's respective rank
        torch.save(predictions, os.path.join(self.output_dir, f"predictions_{trainer.global_rank}.pt"))

        # optionally, you can also save `batch_indices` to get the information about the data index
        # from your prediction data
        torch.save(batch_indices, os.path.join(self.output_dir, f"batch_indices_{trainer.global_rank}.pt"))

Again this is very quick hot fix. As a feature of darts it cloud be add with following logic

Also following code shows how to infer a model that was trained with ddp on a single gpu

trained_model.trainer_params["accelerator"] = "gpu"
trained_model.trainer_params["devices"] = 0  # or any other single index (not list)
model.trainer_params["strategy"] = "auto"

After that predictions from the predict method will return as usual

stompsjo commented 2 months ago

I wanted to chime in to add a +1 on this issue. I was having similar issues and have recreated the error and fix with my scripts as well. I'll put a caveat here that I am also running darts as part of a kedro pipeline, where training and testing occur in two separate nodes. There is always the possibility that kedro is interfering with the handling for multiprocessing (e.g. torch.multiprocessing.freeze_support()) but I am fairly confident I have isolated this to darts.

My error occurs when running TFTModel.historical_forecast and is the exact TypeError described above. I have verified that

However, the above hotfix does not work when using TFTModel.historical_forecast. I still end up with a TypeErroron darts.utils.historical_forecast._optimized_historical_forecasts line 131. I think that again the main process is trying to run the method with predictions=None because the results from the discributed processes is not returned/processed. I tried adding to _optimized_historical_forecasts similar logic to above (if predictions: ... else: return None), but then I ended up with a PyTorch assertion error: assert self.num_samples >= 1 or self.total_size == 0. Is anyone else able to verify if the above fix works with historical_forecasts?

Also, I'm confused why the predictions need to be written to file with the BasePredictionWriter (maybe I should read the Lightning docs more closely). Is it possible for them to just be unified in memory?