unit8co / darts

A python library for user-friendly forecasting and anomaly detection on time series.
https://unit8co.github.io/darts/
Apache License 2.0
8.11k stars 884 forks source link

Training GPU and Memmory Usage #2254

Open murathaciibrahimoglu opened 9 months ago

murathaciibrahimoglu commented 9 months ago

I am trying to build a timeseries forecast model through Darts. But I am not sure whether I am fully utilizing my GPU. My python environment is in Linux Kubernetes. My current setup has Tesla T4 and I set my accelerator as gpu. My GPU memory utilization is just 0.84 gb despite having 8 workers. My data size is 299521 rows by 3 columns. I am running a TFTModel and fitting with 8 loader workers. All the details are in the pictures below. image image

dennisbader commented 9 months ago

Hi @murathaciibrahimoglu. The 0.892 MB is just the model size, not necessarily the GPU usage during training.

You can either monitor the GPU usage from your device directly, or use the DeviceStatsMonitor callback from PyTorch Lightning.

For example, running the script below on my M1 device:

import numpy as np
from pytorch_lightning.callbacks import DeviceStatsMonitor

from darts.datasets import AirPassengersDataset
from darts.models import TiDEModel

series = AirPassengersDataset().load().astype(np.float32)

# monitor your device
device_stats = DeviceStatsMonitor()
model = TiDEModel(
    input_chunk_length=12,
    output_chunk_length=12,
    pl_trainer_kwargs={"callbacks": [device_stats]},
    model_name="my_model",
    force_reset=True,
    log_tensorboard=True,
)
model.fit(series)

Will log a tensorboard files to workdir/darts_logs/my_model/logs (workdir is the current working directory). You can then open them for example from bash:

tensorboard --logdir=/workdir/darts_logs/my_model/logs

It will give you a URL that you can open in your browser. This then shows something like below:

image
murathaciibrahimoglu commented 8 months ago

Hello again, I made a change to the graphic card and I am using Tesla L4. 1 Epoch dropped from 22 minutes to 11 minutes. But I can't see these utilisation values in my own tensorboard like you, why do you think? But there is a data called GPU Duty Cycle in Google Kubernetes Engine Dashboard, I see 100/100 there, I think there is gpu utilisation in this. image

image

I have the chance of Multi Gpu, I trained with two Teslas T4, but unfortunately I could not run the model I saved. When I call the model I saved in the folder with another notebook and cpu / gpu with load, I get this error

"PytorchStreamReader failed reading zip archive: invalid header or archive is corrupted ".

I can run it without any problems in training with a single GPU. In the codes, I only select 0 instead of device auto, so I don't actually make any changes. How do you think I can properly save the model I trained with this multi gpu because I think it runs the main script twice for each graphic card.

# -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE_MAGIC_CELL
# Automatically replaced inline charts by "no-op" charts
# %pylab inline
import matplotlib
matplotlib.use("Agg")

# -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
import dataiku
from dataiku import pandasutils as pdu
import pandas as pd

# -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
import warnings
warnings.filterwarnings("ignore")
import darts
from sklearn.preprocessing import StandardScaler
from darts import TimeSeries
from darts.models import RNNModel, ExponentialSmoothing, BlockRNNModel, TFTModel
from darts.metrics import mape
from darts.utils.statistics import check_seasonality, plot_acf
from darts import concatenate
import optuna
from optuna.integration import PyTorchLightningPruningCallback
import torch
from pytorch_lightning.callbacks import Callback
import dataikuapi
import urllib3
import json
from pytorch_lightning.callbacks import DeviceStatsMonitor
import os

#print(torch.cuda.is_available())
#print(torch.cuda.device_count())
#print(torch.cuda.get_device_name(0))

if __name__ == "__main__":
    torch.multiprocessing.freeze_support()
    minute_data = dataiku.Dataset("minute_data")
    df = minute_data.get_dataframe()

    df=df.sort_values(by="the_timestamp_ltz")
    df['the_timestamp'] = pd.to_datetime(df['the_timestamp']).dt.tz_localize(None)
    df['the_timestamp_ltz'] = pd.to_datetime(df['the_timestamp_ltz']).dt.tz_localize(None)

    train_y_df = df[df["the_timestamp_ltz"]<="2023-12-31 23:59:00"][["the_timestamp_ltz","y"]]
    train_x1_df = df[df["the_timestamp_ltz"]<="2023-12-31 23:59:00"][["the_timestamp_ltz","x1"]]
    train_x2_df = df[df["the_timestamp_ltz"]<="2023-12-31 23:59:00"][["the_timestamp_ltz","x2"]]
    train_x3_df = df[df["the_timestamp_ltz"]<="2023-12-31 23:59:00"][["the_timestamp_ltz","x3"]]

    test_y_df = df[(df["the_timestamp_ltz"]>"2023-12-31 23:59:00") & (df["the_timestamp_ltz"]<="2024-01-31 23:59:00")][["the_timestamp_ltz","y"]]
    test_x1_df = df[(df["the_timestamp_ltz"]>"2023-12-31 23:59:00") & (df["the_timestamp_ltz"]<="2024-01-31 23:59:00")][["the_timestamp_ltz","x1"]]
    test_x2_df = df[(df["the_timestamp_ltz"]>"2023-12-31 23:59:00") & (df["the_timestamp_ltz"]<="2024-01-31 23:59:00")][["the_timestamp_ltz","x2"]]
    test_x3_df = df[(df["the_timestamp_ltz"]>"2023-12-31 23:59:00") & (df["the_timestamp_ltz"]<="2024-01-31 23:59:00")][["the_timestamp_ltz","x3"]]

    val_y_df = df[(df["the_timestamp_ltz"]>"2024-01-31 23:59:00") & (df["the_timestamp_ltz"]<="2024-02-25 00:00:00")][["the_timestamp_ltz","y"]]
    val_x1_df = df[(df["the_timestamp_ltz"]>"2024-01-31 23:59:00") & (df["the_timestamp_ltz"]<="2024-02-25 00:00:00")][["the_timestamp_ltz","x1"]]
    val_x2_df = df[(df["the_timestamp_ltz"]>"2024-01-31 23:59:00") & (df["the_timestamp_ltz"]<="2024-02-25 00:00:00")][["the_timestamp_ltz","x2"]]
    val_x3_df = df[(df["the_timestamp_ltz"]>"2024-01-31 23:59:00") & (df["the_timestamp_ltz"]<="2024-02-25 00:00:00")][["the_timestamp_ltz","x3"]]

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    scaler_y = StandardScaler().fit(train_y_df[["y"]])
    scaler_x1 = StandardScaler().fit(train_x1_df[["x1"]])
    scaler_x2 = StandardScaler().fit(train_x2_df[["x2"]])
    scaler_x3 = StandardScaler().fit(train_x3_df[["x3"]])

    train_y_df[["y"]] = scaler_y.transform(train_y_df[["y"]])
    train_x1_df[["x1"]] = scaler_x1.transform(train_x1_df[["x1"]])
    train_x2_df[["x2"]] = scaler_x2.transform(train_x2_df[["x2"]])
    train_x3_df[["x3"]] = scaler_x3.transform(train_x3_df[["x3"]])

    test_y_df[["y"]] = scaler_y.transform(test_y_df[["y"]])
    test_x1_df[["x1"]] = scaler_x1.transform(test_x1_df[["x1"]])
    test_x2_df[["x2"]] = scaler_x2.transform(test_x2_df[["x2"]])
    test_x3_df[["x3"]] = scaler_x3.transform(test_x3_df[["x3"]])

    val_y_df[["y"]] = scaler_y.transform(val_y_df[["y"]])
    val_x1_df[["x1"]] = scaler_x1.transform(val_x1_df[["x1"]])
    val_x2_df[["x2"]] = scaler_x2.transform(val_x2_df[["x2"]])
    val_x3_df[["x3"]] = scaler_x3.transform(val_x3_df[["x3"]])

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    train_y = TimeSeries.from_dataframe(train_y_df, time_col="the_timestamp_ltz", value_cols=["y"])
    train_x1 = TimeSeries.from_dataframe(train_x1_df, time_col="the_timestamp_ltz", value_cols=["x1"])
    train_x2 = TimeSeries.from_dataframe(train_x2_df, time_col="the_timestamp_ltz", value_cols=["x2"])
    train_x3 = TimeSeries.from_dataframe(train_x3_df, time_col="the_timestamp_ltz", value_cols=["x3"])

    test_y = TimeSeries.from_dataframe(test_y_df, time_col="the_timestamp_ltz", value_cols=["y"])
    test_x1 = TimeSeries.from_dataframe(test_x1_df, time_col="the_timestamp_ltz", value_cols=["x1"])
    test_x2 = TimeSeries.from_dataframe(test_x2_df, time_col="the_timestamp_ltz", value_cols=["x2"])
    test_x3 = TimeSeries.from_dataframe(test_x3_df, time_col="the_timestamp_ltz", value_cols=["x3"])

    val_y = TimeSeries.from_dataframe(val_y_df, time_col="the_timestamp_ltz", value_cols=["y"])
    val_x1 = TimeSeries.from_dataframe(val_x1_df, time_col="the_timestamp_ltz", value_cols=["x1"])
    val_x2 = TimeSeries.from_dataframe(val_x2_df, time_col="the_timestamp_ltz", value_cols=["x2"])
    val_x3 = TimeSeries.from_dataframe(val_x3_df, time_col="the_timestamp_ltz", value_cols=["x3"])

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    future_covariates_train = train_x1
    future_covariates_test = test_x1
    future_covariates_val = val_x1

    past_covariates_train = concatenate([train_x2, train_x3], axis=1)
    past_covariates_test = concatenate([test_x2, test_x3], axis=1)
    past_covariates_val = concatenate([val_x2,val_x3], axis=1)

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE

    class LossLogger(Callback):
        def __init__(self):
            self.train_loss = []
            self.val_loss = []

        def on_train_epoch_end(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule") -> None:
            self.train_loss.append(float(trainer.callback_metrics["train_loss"]))

        def on_validation_epoch_end(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule") -> None:
            self.val_loss.append(float(trainer.callback_metrics["val_loss"]))

    loss_logger = LossLogger()

    device_stats = DeviceStatsMonitor()

    my_model = TFTModel(
        input_chunk_length=180,
        output_chunk_length=120,
        hidden_size=20,
        lstm_layers=1,
        num_attention_heads=1,
        dropout=0.2,
        batch_size=64,
        n_epochs=10,
        model_name="MGT_TFT_13032024",
        log_tensorboard= True,
        add_relative_index=False,
        add_encoders=None,
        random_state=42,
        force_reset=True,
        save_checkpoints=True,
        optimizer_cls = torch.optim.Adam,
        optimizer_kwargs={'lr': 1e-4},
        lr_scheduler_cls = torch.optim.lr_scheduler.ReduceLROnPlateau,
        pl_trainer_kwargs={
        "accelerator": "gpu",
        "devices": "auto",
        "callbacks": [loss_logger,device_stats]
        })

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    my_model.fit(
        train_y,
        future_covariates=future_covariates_train,
        past_covariates=past_covariates_train,
        val_series=test_y,
        val_future_covariates=future_covariates_test,
        val_past_covariates=past_covariates_test,
        verbose=True,
        num_loader_workers=1
    )

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    #pred = my_model.predict(n=180,series=val_transformed[:300],future_covariates=val_covariates)

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    #from matplotlib.pyplot import figure
    #figure(figsize=(15, 5), dpi=200)
    #pred.plot(color="r")
    #val_transformed[:870].plot()

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
model_name="tft_10e_20dim_1l_model_test_13032024.pt"
model_name_ckpt="tft_10e_20dim_1l_model_test_13032024.pt.ckpt"
plot_name="tft_10e_20dim_1l_model_test_13032024.png"

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE

my_model.save(model_name)

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE

host = ""
apiKey = ""

client = dataikuapi.DSSClient(host, apiKey)
client._session.verify = True

project = client.get_project("")

managedfolder = project.get_managed_folder("")

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
with open(model_name, "rb") as file:
    managedfolder.put_file(model_name, file)

with open(model_name_ckpt, "rb") as file:
    managedfolder.put_file(model_name_ckpt, file)

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
managedfolder.upload_folder("darts_logs", "darts_logs")

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
epoch_metrics=pd.DataFrame()
epoch_metrics["epoch"]=range(1,len(loss_logger.train_loss)+1)
epoch_metrics["val_loss"]=loss_logger.val_loss[1:]
epoch_metrics["train_loss"]=loss_logger.train_loss

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
plot = epoch_metrics.set_index("epoch").plot()
fig = plot.get_figure()
fig.savefig(plot_name)

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
with open(plot_name, "rb") as file:
    managedfolder.put_file(plot_name, file)

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE

# -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
# Write recipe outputs
job_result = dataiku.Dataset("job_result")
job_result.write_with_schema(epoch_metrics)
murathaciibrahimoglu commented 8 months ago

@dennisbader @madtoinou

dennisbader commented 8 months ago

Hi @murathaciibrahimoglu, about your first question: I was using a MacBook Pro with M1 (different GPU compared to yours), that's why you cannot see the same graphs.

Regarding the second question, it's hard to answer since we don't have the infrastructure to test multi-GPU scenarios. You can find some ongoing community discussions about it in #2265. Maybe other people there know the solution / can help.

Also since we built our neural network framework on PyTorch Lightning, it could also be useful to check the web about similar cases.

Let us know if you find a solution, so we can help others with the same issue.