unit8co / darts

A python library for user-friendly forecasting and anomaly detection on time series.
https://unit8co.github.io/darts/
Apache License 2.0
8.08k stars 880 forks source link

[BUG] Prediction on multiple gpus return results from only one gpu. #1945

Open heury opened 1 year ago

heury commented 1 year ago

Describe the bug My server has four gpus. When I test prediction on four gpus, darts seems returning results from only one gpu.

To Reproduce

I trained TFT model with following trainer options and saved model to file.

pl_trainer_kwargs={ "accelerator": "gpu", "devices": "auto", }

I loaded model from file by "load_weights" method.

model.load_weights("tft_model.pt")

And I test prediction with test dataset. But, its count does not match with input data count. For example, I only get two results when I input 10 time series for prediction. It seems that darts get results from only one gpu and the other results are missing. It does not happen on a single gpu with following options.

pl_trainer_kwargs={ "accelerator": "gpu", "devices": [0], }

Expected behavior Prediction output count must match with input data count

System (please complete the following information):

Additional context Add any other context about the problem here.

dennisbader commented 1 year ago

Hi @heury , and thanks for writing. Unfortunately we don't have the infrastructure to test on multiple GPUs. So we would need a bit of help for debugging.

To make sure that everything is setup as expected:

heury commented 1 year ago

You can reproduce it with TFT example code in darts. Following code causes AssertionError when I set "devices": "auto" But, it is disappeared wen I set "devices":[0] This AssertionError is related with multiple gpus. I guess my problem is also related with it.

 File "/home/jovyan/miniconda3/envs/ts/lib/python3.10/site-packages/pytorch_lightning/overrides/distributed.py", line 213, in __init__
    assert self.num_samples >= 1 or self.total_size == 0
AssertionError
    assert self.num_samples >= 1 or self.total_size == 0
import numpy as np
import pandas as pd
from tqdm import tqdm_notebook as tqdm
import torch
import matplotlib.pyplot as plt

from darts import TimeSeries, concatenate
from darts.dataprocessing.transformers import Scaler
from darts.models import TFTModel
from darts.metrics import mape
from darts.utils.statistics import check_seasonality, plot_acf
from darts.datasets import AirPassengersDataset, IceCreamHeaterDataset
from darts.utils.timeseries_generation import datetime_attribute_timeseries
from darts.utils.likelihood_models import QuantileRegression

import warnings

warnings.filterwarnings("ignore")
import logging

logging.disable(logging.CRITICAL)

def get_model():
    quantiles = [
        0.01,
        0.05,
        0.1,
        0.15,
        0.2,
        0.25,
        0.3,
        0.4,
        0.5,
        0.6,
        0.7,
        0.75,
        0.8,
        0.85,
        0.9,
        0.95,
        0.99,
    ]
    input_chunk_length = 24
    forecast_horizon = 12
    my_model = TFTModel(
        input_chunk_length=input_chunk_length,
        output_chunk_length=forecast_horizon,
        hidden_size=64,
        lstm_layers=1,
        num_attention_heads=4,
        dropout=0.1,
        batch_size=16,
        n_epochs=300,
        add_relative_index=False,
        add_encoders=None,
        likelihood=QuantileRegression(
            quantiles=quantiles
        ),  # QuantileRegression is set per default
        # loss_fn=MSELoss(),
        pl_trainer_kwargs={
            "accelerator": "gpu",
            # "gpus": 1,
            "devices": "auto",
            #"devices": [0],
            # "devices": 1,
            # "strategy":'ddp_notebook',
            # "logger": wandb_logger,
        },
        categorical_embedding_sizes={},
        random_state=42,
    )
    return my_model

def eval_model(model, n, val_series, num_samples):
    pred_series = model.predict(n=n, num_samples=num_samples)

    # plot actual series

    # plot prediction with quantile ranges

    print("MAPE: {:.2f}%".format(mape(val_series, pred_series)))

def main(cmd):
    # before starting, we define some constants
    num_samples = 200

    figsize = (9, 6)
    lowest_q, low_q, high_q, highest_q = 0.01, 0.1, 0.9, 0.99
    label_q_outer = f"{int(lowest_q * 100)}-{int(highest_q * 100)}th percentiles"
    label_q_inner = f"{int(low_q * 100)}-{int(high_q * 100)}th percentiles"
    # Read data
    series = AirPassengersDataset().load()

    # we convert monthly number of passengers to average daily number of passengers per month
    series = series / TimeSeries.from_series(series.time_index.days_in_month)
    series = series.astype(np.float32)

    # Create training and validation sets:
    training_cutoff = pd.Timestamp("19571201")
    train, val = series.split_after(training_cutoff)

    # Normalize the time series (note: we avoid fitting the transformer on the validation set)
    transformer = Scaler()
    train_transformed = transformer.fit_transform(train)
    val_transformed = transformer.transform(val)
    series_transformed = transformer.transform(series)

    # create year, month and integer index covariate series
    covariates = datetime_attribute_timeseries(series, attribute="year", one_hot=False)
    covariates = covariates.stack(
        datetime_attribute_timeseries(series, attribute="month", one_hot=False)
    )
    covariates = covariates.stack(
        TimeSeries.from_times_and_values(
            times=series.time_index,
            values=np.arange(len(series)),
            columns=["linear_increase"],
        )
    )
    covariates = covariates.astype(np.float32)

    # transform covariates (note: we fit the transformer on train split and can then transform the entire covariates series)
    scaler_covs = Scaler()
    cov_train, cov_val = covariates.split_after(training_cutoff)
    scaler_covs.fit(cov_train)
    covariates_transformed = scaler_covs.transform(covariates)

    if cmd == "train":
        # default quantiles for QuantileRegression
        my_model = get_model()
        my_model.fit(train_transformed, future_covariates=covariates_transformed, verbose=True) 
        my_model.save("tft_model.pt")
        pred_series = my_model.predict(n=24, num_samples=num_samples)
        print("MAPE: {:.2f}%".format(mape(val_transformed, pred_series)))

    elif cmd == "load":
        my_model = get_model()
        my_model.load_weights(path="tft_model.pt")
        pred_series = my_model.predict(n=12, series=train_transformed, future_covariates=covariates_transformed)
        print("MAPE: {:.2f}%".format(mape(val_transformed, pred_series)))

if __name__ == "__main__": 
    torch.multiprocessing.freeze_support()
    main("train")    
    # main("load")    
dennisbader commented 1 year ago

Can you try creating the model with pl_trainer_kwargs={"accelerator": "gpu", "devices": -1, "auto_select_gpus": True} as described in the multi GPU section?

Also this example should work as it's only fitting/predicting a single series, right?

heury commented 1 year ago

I use lightning==2.0.6 and "auto_select_gpus" does not exist in that version. So, I change it to "devices": "auto" and it works.

I use multiple time series for TFT model. I try to predict stock price. So, the data is in between 9 am and 15.30 pm per day. So, I make a time series per each day.

dennisbader commented 1 year ago

Alright, can you reduce your example to a minimal example that will cause the issue. It makes it easier for others to debug it. As far as I understand, the example that you shared works fine because it's only a single series.

You can use one of our smaller datasets for this as a series (and then just make a list of it).

import torch
from darts.models import TFTModel
from darts.datasets import AirPassengersDataset

if __name__ == "__main__": 
    torch.multiprocessing.freeze_support()
    series = AirPassengersDataset().load()
    series = [series] * 10

    model = TFTModel(
        input_chunk_length=12,
        output_chunk_length=6,
        add_relative_index=True
        pl_trainer_kwargs={"accelerator": "gpu", "devices": "auto"}
    )
    model.fit(series, epochs=10)
    preds = model.predict(n=6, series=series, num_samples=100)

Does the above work for you or not?

heury commented 1 year ago

I think my description is not enough. I run your code with single timeseries and multiple timeseries and it returns different results.

  1. Single time series it causes AssertionError

result

  File "/home/jovyan/miniconda3/envs/ts/lib/python3.10/site-packages/pytorch_lightning/overrides/distributed.py", line 213, in __init__
    assert self.num_samples >= 1 or self.total_size == 0

code

import torch
from darts.models import TFTModel
from darts.datasets import AirPassengersDataset

if __name__ == "__main__": 
    torch.multiprocessing.freeze_support()
    series = AirPassengersDataset().load()
    # series = [series] * 10

    model = TFTModel(
        input_chunk_length=12,
        output_chunk_length=6,
        add_relative_index=True,
        pl_trainer_kwargs={"accelerator": "gpu", "devices": "auto"}
    )
    model.fit(series, epochs=10)
    preds = model.predict(n=6, series=series, num_samples=100)
    print("len(preds)",len(preds), "len(series)", len(series))
  1. Multiple timeseries It returns multiple results. I think it runs in parallel and returns each result.

result

len(preds) 2 len(series) 10
len(preds) 3 len(series) 10
len(preds) 3 len(series) 10
len(preds) 2 len(series) 10

code

import torch
from darts.models import TFTModel
from darts.datasets import AirPassengersDataset

if __name__ == "__main__": 
    torch.multiprocessing.freeze_support()
    series = AirPassengersDataset().load()
    series = [series] * 10

    model = TFTModel(
        input_chunk_length=12,
        output_chunk_length=6,
        add_relative_index=True,
        pl_trainer_kwargs={"accelerator": "gpu", "devices": "auto"}
    )
    model.fit(series, epochs=10)
    preds = model.predict(n=6, series=series, num_samples=100)
    print("len(preds)",len(preds), "len(series)", len(series))
SamuMazzi commented 1 year ago

Any news about this? I'm trying to do the same thing with a RNNModel and it gives me the exact same result

KoustavDS commented 1 year ago

@dennisbader I am also facing similar issue. I ran the below code. While predicting, it is distributed across all the GPUs, hence not able to use "pred" for any further calculation. Please do let me know if there is any solution.

if name == "main": torch.multiprocessing.freeze_support() series = AirPassengersDataset().load() series = [series] * 10

model = TFTModel(
    input_chunk_length=12,
    output_chunk_length=6,
    add_relative_index=True,
    pl_trainer_kwargs={"accelerator": "gpu", "devices": "auto"},
)
model.fit(series, epochs=10)
preds = model.predict(n=6, series=series, num_samples=100)

print("Print the length----->")
print(len(series))
print(len(preds))

----------------------------------------- Output ----------------------------------------------------------------------- Print the length-----> 10 2 Print the length-----> 10 2 Predicting DataLoader 0: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 3.47it/s] Print the length-----> 10 3 Print the length-----> 10 3

pengfei-chen commented 1 year ago

You can reproduce it with TFT example code in darts. Following code causes AssertionError when I set "devices": "auto" But, it is disappeared wen I set "devices":[0] This AssertionError is related with multiple gpus. I guess my problem is also related with it.

 File "/home/jovyan/miniconda3/envs/ts/lib/python3.10/site-packages/pytorch_lightning/overrides/distributed.py", line 213, in __init__
    assert self.num_samples >= 1 or self.total_size == 0
AssertionError
    assert self.num_samples >= 1 or self.total_size == 0
import numpy as np
import pandas as pd
from tqdm import tqdm_notebook as tqdm
import torch
import matplotlib.pyplot as plt

from darts import TimeSeries, concatenate
from darts.dataprocessing.transformers import Scaler
from darts.models import TFTModel
from darts.metrics import mape
from darts.utils.statistics import check_seasonality, plot_acf
from darts.datasets import AirPassengersDataset, IceCreamHeaterDataset
from darts.utils.timeseries_generation import datetime_attribute_timeseries
from darts.utils.likelihood_models import QuantileRegression

import warnings

warnings.filterwarnings("ignore")
import logging

logging.disable(logging.CRITICAL)

def get_model():
    quantiles = [
        0.01,
        0.05,
        0.1,
        0.15,
        0.2,
        0.25,
        0.3,
        0.4,
        0.5,
        0.6,
        0.7,
        0.75,
        0.8,
        0.85,
        0.9,
        0.95,
        0.99,
    ]
    input_chunk_length = 24
    forecast_horizon = 12
    my_model = TFTModel(
        input_chunk_length=input_chunk_length,
        output_chunk_length=forecast_horizon,
        hidden_size=64,
        lstm_layers=1,
        num_attention_heads=4,
        dropout=0.1,
        batch_size=16,
        n_epochs=300,
        add_relative_index=False,
        add_encoders=None,
        likelihood=QuantileRegression(
            quantiles=quantiles
        ),  # QuantileRegression is set per default
        # loss_fn=MSELoss(),
        pl_trainer_kwargs={
            "accelerator": "gpu",
            # "gpus": 1,
            "devices": "auto",
            #"devices": [0],
            # "devices": 1,
            # "strategy":'ddp_notebook',
            # "logger": wandb_logger,
        },
        categorical_embedding_sizes={},
        random_state=42,
    )
    return my_model

def eval_model(model, n, val_series, num_samples):
    pred_series = model.predict(n=n, num_samples=num_samples)

    # plot actual series

    # plot prediction with quantile ranges

    print("MAPE: {:.2f}%".format(mape(val_series, pred_series)))

def main(cmd):
    # before starting, we define some constants
    num_samples = 200

    figsize = (9, 6)
    lowest_q, low_q, high_q, highest_q = 0.01, 0.1, 0.9, 0.99
    label_q_outer = f"{int(lowest_q * 100)}-{int(highest_q * 100)}th percentiles"
    label_q_inner = f"{int(low_q * 100)}-{int(high_q * 100)}th percentiles"
    # Read data
    series = AirPassengersDataset().load()

    # we convert monthly number of passengers to average daily number of passengers per month
    series = series / TimeSeries.from_series(series.time_index.days_in_month)
    series = series.astype(np.float32)

    # Create training and validation sets:
    training_cutoff = pd.Timestamp("19571201")
    train, val = series.split_after(training_cutoff)

    # Normalize the time series (note: we avoid fitting the transformer on the validation set)
    transformer = Scaler()
    train_transformed = transformer.fit_transform(train)
    val_transformed = transformer.transform(val)
    series_transformed = transformer.transform(series)

    # create year, month and integer index covariate series
    covariates = datetime_attribute_timeseries(series, attribute="year", one_hot=False)
    covariates = covariates.stack(
        datetime_attribute_timeseries(series, attribute="month", one_hot=False)
    )
    covariates = covariates.stack(
        TimeSeries.from_times_and_values(
            times=series.time_index,
            values=np.arange(len(series)),
            columns=["linear_increase"],
        )
    )
    covariates = covariates.astype(np.float32)

    # transform covariates (note: we fit the transformer on train split and can then transform the entire covariates series)
    scaler_covs = Scaler()
    cov_train, cov_val = covariates.split_after(training_cutoff)
    scaler_covs.fit(cov_train)
    covariates_transformed = scaler_covs.transform(covariates)

    if cmd == "train":
        # default quantiles for QuantileRegression
        my_model = get_model()
        my_model.fit(train_transformed, future_covariates=covariates_transformed, verbose=True) 
        my_model.save("tft_model.pt")
        pred_series = my_model.predict(n=24, num_samples=num_samples)
        print("MAPE: {:.2f}%".format(mape(val_transformed, pred_series)))

    elif cmd == "load":
        my_model = get_model()
        my_model.load_weights(path="tft_model.pt")
        pred_series = my_model.predict(n=12, series=train_transformed, future_covariates=covariates_transformed)
        print("MAPE: {:.2f}%".format(mape(val_transformed, pred_series)))

if __name__ == "__main__": 
    torch.multiprocessing.freeze_support()
    main("train")    
    # main("load")    

I have a problem that is almost exactly the same as this problem, anyone who knows how to solve this problem, please reply to me, thank you!

pengfei-chen commented 1 year ago

Alright, can you reduce your example to a minimal example that will cause the issue. It makes it easier for others to debug it. As far as I understand, the example that you shared works fine because it's only a single series.

You can use one of our smaller datasets for this as a series (and then just make a list of it).

import torch
from darts.models import TFTModel
from darts.datasets import AirPassengersDataset

if __name__ == "__main__": 
    torch.multiprocessing.freeze_support()
    series = AirPassengersDataset().load()
    series = [series] * 10

    model = TFTModel(
        input_chunk_length=12,
        output_chunk_length=6,
        add_relative_index=True
        pl_trainer_kwargs={"accelerator": "gpu", "devices": "auto"}
    )
    model.fit(series, epochs=10)
    preds = model.predict(n=6, series=series, num_samples=100)

Does the above work for you or not?

Hello, I am faced with almost exactly the same problem as this one. When can this problem be fixed, or can you provide some solutions?

KoustavDS commented 1 year ago

Hi .. any update on this specific issue

madtoinou commented 8 months ago

Hi, a solution seems to have been suggested in #2265. Can you check if it solves the problem?