Open AugustComte opened 1 month ago
I have this working for enbpi, which involved changes to a custom sklearn wrapper that allowed me to deal with sequences in the tensors. It does not work with gaps (partfitting example from your docs currently)
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, Dataset
import lightning as L
from lightning import Trainer
from lightning.pytorch.callbacks import EarlyStopping
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.metrics import mean_absolute_error
from darts.dataprocessing.transformers import Scaler, Mapper, BoxCox
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils import check_random_state
from sklearn.model_selection._split import _BaseKFold
from typing import Generator, Tuple, Any, Optional, Union
from numpy.typing import NDArray
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# Data laod
data = pd.read_csv('data\pseudo_sales.csv')
data['date'] = pd.to_datetime(data['date'], format='%d/%m/%Y')
data = data.sort_values('date')
# Plot series
plt.figure(figsize=(12, 6))
plt.plot(data['date'], data['sales'], label='sales')
plt.xlabel('date')
plt.ylabel('sales')
plt.title('Sales over time')
plt.legend()
plt.grid(True)
plt.show()
# Date features
def add_datetime_features(df, datetime_column):
df[datetime_column] = pd.to_datetime(df[datetime_column])
# Drop Sunday (value is 6)
#df = df[df[datetime_column].dt.weekday != 6].copy()
# Features
df['year'] = df[datetime_column].dt.year
df['month'] = df[datetime_column].dt.month
df['week'] = df[datetime_column].dt.isocalendar().week
df['day'] = df[datetime_column].dt.day
return df.copy()
df = add_datetime_features(data, "date")
date_col = df['date'].copy() # useful later
# Data splitting
test_size = 7 * 13
df_train = df.iloc[:-test_size, :].copy()
df_test = df.iloc[-test_size:, :].copy()
X_train = df_train.loc[:, ["day_of_week", "week_num", "promotion", "price", "year", "month", "week", "day"]]
y_train = df_train["sales"]
X_test = df_test.loc[:, ["day_of_week", "week_num", "promotion", "price", "year", "month", "week", "day"]]
y_test = df_test["sales"]
class TimeSeriesDataset(Dataset):
def __init__(self, inputs, targets):
self.inputs = torch.tensor(inputs, dtype=torch.float32)
self.targets = torch.tensor(targets, dtype=torch.float32).reshape(-1, 1)
def __len__(self):
return len(self.inputs)
def __getitem__(self, idx):
return self.inputs[idx], self.targets[idx]
class FFNetwork(L.LightningModule):
def __init__(self, input_dim, sequence_length, hidden_dim, num_layers=1, output_dim=1, learning_rate=0.0001, dropout_rate=0.5):
super(FFNetwork, self).__init__()
self.layers = nn.ModuleList([nn.Linear(input_dim * sequence_length, hidden_dim)])
for _ in range(num_layers - 1):
self.layers.append(nn.Linear(hidden_dim, hidden_dim))
self.layers.append(nn.Linear(hidden_dim, output_dim))
self.dropout = nn.Dropout(dropout_rate)
self.activation = nn.ReLU()
self.learning_rate = learning_rate
def forward(self, x):
batch_size, sequence_length, input_dim = x.size()
x = x.view(batch_size, sequence_length * input_dim)
for i in range(len(self.layers) - 1):
x = self.activation(self.layers[i](x))
x = self.dropout(x)
x = self.layers[-1](x)
return x
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self.forward(x)
loss = nn.functional.mse_loss(y_hat, y)
self.log('train_loss', loss)
return loss
def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=self.learning_rate)
class SKLearnFFNetwork(BaseEstimator, RegressorMixin):
def __init__(self, input_dim, sequence_length, hidden_dim, num_layers=1, output_dim=1,
learning_rate=0.0001, dropout_rate=0.5, batch_size=32, max_epochs=300):
self.input_dim = input_dim
self.sequence_length = sequence_length
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.output_dim = output_dim
self.learning_rate = learning_rate
self.dropout_rate = dropout_rate
self.batch_size = batch_size
self.max_epochs = max_epochs
self.model = None
self.feature_scaler = MinMaxScaler()
self.target_scaler = MinMaxScaler()
def create_time_windows(self, features, target):
inputs, targets = [], []
for i in range(self.sequence_length, len(features)):
inputs.append(features[i-self.sequence_length:i])
targets.append(target[i])
return np.array(inputs), np.array(targets)
def fit(self, X, y):
# Scale features and target
X_scaled = self.feature_scaler.fit_transform(X)
y_scaled = self.target_scaler.fit_transform(y.values.reshape(-1, 1))
# Create time windows
inputs, targets = self.create_time_windows(X_scaled, y_scaled)
# Create datasets
train_dataset = TimeSeriesDataset(inputs, targets)
train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)
# Initialise and train model
self.model = FFNetwork(
input_dim=self.input_dim,
sequence_length=self.sequence_length,
hidden_dim=self.hidden_dim,
num_layers=self.num_layers,
output_dim=self.output_dim,
learning_rate=self.learning_rate,
dropout_rate=self.dropout_rate
)
early_stopping = EarlyStopping(monitor='train_loss', patience=10, verbose=True, mode='min')
trainer = Trainer(max_epochs=self.max_epochs, callbacks=[early_stopping], logger=True)
trainer.fit(self.model, train_loader)
return self
def predict(self, X):
self.model.eval()
with torch.no_grad():
X_scaled = self.feature_scaler.transform(X)
inputs, _ = self.create_time_windows(X_scaled, np.zeros((X_scaled.shape[0], 1)))
inputs = torch.tensor(inputs, dtype=torch.float32)
predictions = self.model(inputs).cpu().numpy()
predictions = self.target_scaler.inverse_transform(predictions)
return predictions.flatten()
class SKLearnFFNetwork(BaseEstimator, RegressorMixin):
def __init__(self, input_dim, sequence_length, hidden_dim, num_layers=1, output_dim=1,
learning_rate=0.0001, dropout_rate=0.5, batch_size=32, max_epochs=300):
self.input_dim = input_dim
self.sequence_length = sequence_length
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.output_dim = output_dim
self.learning_rate = learning_rate
self.dropout_rate = dropout_rate
self.batch_size = batch_size
self.max_epochs = max_epochs
self.model = None
self.feature_scaler = MinMaxScaler()
self.target_scaler = MinMaxScaler()
def create_time_windows(self, features, target=None):
inputs = []
targets = [] if target is not None else None
for i in range(len(features) - self.sequence_length + 1):
inputs.append(features[i:i+self.sequence_length])
if target is not None:
targets.append(target[i+self.sequence_length-1])
return np.array(inputs), np.array(targets) if targets is not None else None
def fit(self, X, y):
if isinstance(X, pd.DataFrame):
X = X.values
if isinstance(y, (pd.Series, pd.DataFrame)):
y = y.values.reshape(-1, 1)
elif isinstance(y, np.ndarray):
y = y.reshape(-1, 1)
X_scaled = self.feature_scaler.fit_transform(X)
y_scaled = self.target_scaler.fit_transform(y)
inputs, targets = self.create_time_windows(X_scaled, y_scaled)
train_dataset = TimeSeriesDataset(inputs, targets)
train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)
self.model = FFNetwork(
input_dim=self.input_dim,
sequence_length=self.sequence_length,
hidden_dim=self.hidden_dim,
num_layers=self.num_layers,
output_dim=self.output_dim,
learning_rate=self.learning_rate,
dropout_rate=self.dropout_rate
)
early_stopping = EarlyStopping(monitor='train_loss', patience=10, verbose=True, mode='min')
trainer = Trainer(max_epochs=self.max_epochs, callbacks=[early_stopping], logger=True)
trainer.fit(self.model, train_loader)
return self
def predict(self, X):
self.model.eval()
with torch.no_grad():
if isinstance(X, pd.DataFrame):
X = X.values
X_scaled = self.feature_scaler.transform(X)
inputs, _ = self.create_time_windows(X_scaled)
inputs = torch.tensor(inputs, dtype=torch.float32)
predictions = self.model(inputs).cpu().numpy()
predictions = self.target_scaler.inverse_transform(predictions)
# Pad predictions to match original input length
padded_predictions = np.full(X.shape[0], np.nan)
padded_predictions[self.sequence_length-1:] = predictions.flatten()
return padded_predictions
def get_params(self, deep=True):
return {
"input_dim": self.input_dim,
"sequence_length": self.sequence_length,
"hidden_dim": self.hidden_dim,
"num_layers": self.num_layers,
"output_dim": self.output_dim,
"learning_rate": self.learning_rate,
"dropout_rate": self.dropout_rate,
"batch_size": self.batch_size,
"max_epochs": self.max_epochs
}
def set_params(self, **parameters):
for parameter, value in parameters.items():
setattr(self, parameter, value)
return self
window_size = 32 # same as sequence_length
input_dim = X_train.shape[1]
hidden_dim = 63
num_layers = 4
output_dim = 1
learning_rate = 0.000689
dropout_rate = 0.170971
model = SKLearnFFNetwork(
input_dim=input_dim,
sequence_length=window_size,
hidden_dim=hidden_dim,
num_layers=num_layers,
output_dim=output_dim,
learning_rate=learning_rate,
dropout_rate=dropout_rate,
batch_size=32,
max_epochs=300
)
# Fit model
model.fit(X_train, y_train)
X_test_with_history = pd.concat([X_train.iloc[-window_size:], X_test])
predictions = model.predict(X_test_with_history)
# Trim predictions to match original test set length
predictions = predictions[-len(X_test):]
# plot point pred
plt.figure(figsize=(16, 5))
plt.plot(y_train)
plt.plot(y_test)
plt.plot(y_test.index ,predictions)
plt.ylabel("Hourly demand (GW)")
plt.legend(["Training data", "Test data"])
plt.show()
#### MAPIE process and code updates
class BlockBootstrap(_BaseKFold):
def __init__(
self,
n_resamplings: int = 30,
n_blocks: int = 10,
window_size: int = 32,
test_size: int = 91,
random_state: Optional[Union[int, np.random.RandomState]] = None
) -> None:
self.n_resamplings = n_resamplings
self.n_blocks = n_blocks
self.window_size = window_size
self.test_size = test_size
self.random_state = random_state
def split(
self, X: NDArray, y: Optional[NDArray] = None, groups: Optional[NDArray] = None
) -> Generator[Tuple[NDArray, NDArray], None, None]:
n = len(X)
train_size = n - self.test_size
block_size = train_size // self.n_blocks
random_state = check_random_state(self.random_state)
for _ in range(self.n_resamplings):
# Create blocks
block_starts = random_state.choice(range(train_size - block_size + 1), size=self.n_blocks, replace=True)
train_indices = np.concatenate([np.arange(start, start + block_size) for start in block_starts])
train_indices = np.unique(train_indices)
# Ensure last window_size elements of training set are included
train_indices = np.unique(np.concatenate([train_indices, np.arange(train_size - self.window_size, train_size)]))
# Create test indices
test_indices = np.arange(train_size, n)
yield train_indices, test_indices
def get_n_splits(self, X: Optional[NDArray] = None, y: Optional[NDArray] = None, groups: Optional[NDArray] = None) -> int:
return self.n_resamplings
from mapie.regression import MapieTimeSeriesRegressor
from mapie.metrics import regression_coverage_score, regression_mean_width_score, coverage_width_based
alpha = 0.05
gap = 1
window_size = 32
test_size = 91
cv_mapiets = BlockBootstrap(
n_resamplings=10,
n_blocks=10,
#overlapping=False,
window_size=window_size,
test_size=test_size,
random_state=59
)
mapie_enbpi = MapieTimeSeriesRegressor(
model, method="enbpi", cv=cv_mapiets, agg_function="mean", n_jobs=-1
)
# For EnbPI
mapie_enbpi = mapie_enbpi.fit(X_train, y_train)
#del([y_pred_enbpi_npfit,y_pis_enbpi_npfit])
X_test_with_history = pd.concat([X_train.iloc[-window_size:], X_test])
y_pred_raw , y_pis_raw = mapie_enbpi.predict(
X_test_with_history, alpha=alpha, ensemble=True, optimize_beta=True,
allow_infinite_bounds=True
)
# trim to match y_test
y_pred = y_pred_raw[-test_size:]
y_pis = y_pis_raw[-test_size:]
valid_pred_indices = ~np.isnan(y_pred)
y_pred = y_pred[valid_pred_indices]
y_pis = y_pis[valid_pred_indices]
width_enbpi = regression_mean_width_score(
y_pis[:, 0, 0], y_pis[:, 1, 0]
)
cwc_enbpi = coverage_width_based(
y_test, y_pis[:, 0, 0],
y_pis[:, 1, 0],
eta=10,
alpha=0.05
)
# Print results
print(f"Coverage: {coverage_enbpi}")
print(f"Mean Width: {width_enbpi}")
print(f"CWC: {cwc_enbpi}")
# Extract dates for test set
train_dates = date_col[:len(y_train)]
test_dates = date_col[-len(y_test):]
plt.figure(figsize=(14, 7))
#plt.plot(train_dates, y_train, label='Actual', color='black', linewidth=1.5) # un comment to see whole ts length
plt.plot(test_dates, y_test, label='Actual', color='blue', linewidth=1.5)
plt.plot(test_dates, predictions, label='Predictions', color='purple', linestyle='--', linewidth=1.5)
plt.fill_between(test_dates, y_pis[:, 0, 0], y_pis[:, 1, 0], color='purple', alpha=0.3, label='Prediction Interval')
plt.xlabel('Time')
plt.ylabel('Sales')
plt.title('Actual vs Predictions with Conformal Prediction Intervals')
plt.legend()
plt.grid(True)
plt.show()
Hello there,
Is your documentation request related to a problem? Please describe. I've been having difficulty in wrapping my pytorch forecasting models to use with your library. While I have read that you can "Easily wrap any model (scikit-learn, tensorflow, pytorch, …) with, if needed, a scikit-learn-compatible wrapper for the purposes just mentioned.". I personally find this to be non-trivial, as I have never done it. But more importantly, because most documentation is on regressors and classifiers, and there is little on time-series forecasting models especially for pytorch. The example in the time series section of the MAPIE docs uses random forest I believe, which if fine when using models that are easily suited for sklearn, but pytorch models seem harder to set up. I have attempted to use Skorch to aid me in this, but there documentation also lacks examples where time series models are wrapped. Secondly, most examples use univariate data with no exogenous variables, while univariate with exogenous is perhaps more common in a business setting outside of finance...even then, a lot of financial models use exogenous to inform the models.
Describe the solution you'd like I would like to see a clear example of the applications of MAPIE conformal intervals (enbpi and the improved version), taking a pytorch model, wrapping it, and applying it to time series data with/ and without exogenous variables. If the data is created as tensors previously as exogenous variables, how do we reshape it? Or should we not do that at all, and allow something like skorch to handle it, and just set a batching param? The example should show how to validate the performance of the approaches for time series similar to this (https://github.com/mtorabirad/MLBoost/blob/main/Episode15/Episode15Main.ipynb).
Here's my rather patchy code, minus attempts to turn into an sklearn wrapped model, if it helps you are welcome to use it. pseudo_sales.csv
Thank you for reading.