fzi-forschungszentrum-informatik / TSInterpret

An Open-Source Library for the interpretability of time series classifiers
BSD 3-Clause "New" or "Revised" License
123 stars 11 forks source link

[Q] Trouble generating LEFTIST explanations with PYTORCH #70

Closed marreapato closed 3 months ago

marreapato commented 3 months ago

Dear Authors and community, thanks for the effort you put in building this library, it's being really useful for me, however i am having trouble trying to generate explanations in the LEFTIST method for pytorch models.

Basically i have this structure:

Shape of X_train: (181177, 30, 1) Shape of X_test: (45295, 30, 1) Shape of y_train: (181177,) Shape of y_test: (45295,)

And the code is the following:


from sklearn.model_selection import train_test_split

# Perform train-test split with stratification
X_train, X_test, y_train, y_test = train_test_split(x_values, y_venda,
                                                    test_size=0.2,
                                                    stratify=y_venda,
                                                    random_state=42)

# Now, X_train, X_test, y_train, y_test contain the split datasets
# X_train and y_train are the training features and labels
# X_test and y_test are the test features and labels

# Print shapes of the resulting datasets
print("Shape of X_train:", X_train.shape)
print("Shape of X_test:", X_test.shape)
print("Shape of y_train:", y_train.shape)
print("Shape of y_test:", y_test.shape)
# Initialize the scaler
scaler_X = MinMaxScaler()
scaler_y = MinMaxScaler()

# Reshape the data to 2D for scaling
X_train_reshaped = X_train.reshape(-1, X_train.shape[1])
X_test_reshaped = X_test.reshape(-1, X_test.shape[1])

# Fit and transform the training data
X_train_scaled = scaler_X.fit_transform(X_train_reshaped)

# Transform the test data
X_test_scaled = scaler_X.transform(X_test_reshaped)

# Reshape back to original shape
X_train_scaled = X_train_scaled.reshape(X_train.shape)
X_test_scaled = X_test_scaled.reshape(X_test.shape)

# Now X_train_scaled, X_test_scaled, y_train_scaled, and y_test_scaled are ready to use
from torch.utils.data import DataLoader, TensorDataset

# Convert arrays to PyTorch tensors
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

X_train_tensor.shape

import numpy as np
import random

# Set the seed for reproducibility
seed = 2
np.random.seed(seed)
random.seed(seed)

# Create DataLoader for training and validation sets
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

val_dataset = TensorDataset(X_test_tensor, y_test_tensor)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Define the simple DNN model
class SimpleDNN(nn.Module):
    def __init__(self, input_shape):
        super(SimpleDNN, self).__init__()
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(input_shape, 16)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(16, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.flatten(x)
        x = self.relu(self.fc1(x))
        x = self.sigmoid(self.fc2(x))
        return x

# Define the model
input_shape = X_train.shape[1] * X_train.shape[2]
model = SimpleDNN(input_shape).to(device)

# Define the optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.BCELoss()

# Define the training loop
num_epochs = 500
patience = 50
best_loss = float('inf')
counter = 0

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct_train = 0
    total_train = 0

    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)  # Move data to GPU
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs.squeeze(), y_batch)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * X_batch.size(0)
        predicted = torch.round(outputs)  # Use round instead of > 0.5
        correct_train += (predicted.squeeze() == y_batch).sum().item()
        total_train += y_batch.size(0)

    # Validation phase
    model.eval()
    val_loss = 0.0
    correct_val = 0
    total_val = 0
    ##y_pred = []

    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)  # Move data to GPU
            outputs = model(X_batch)
            loss = criterion(outputs.squeeze(), y_batch)
            val_loss += loss.item() * X_batch.size(0)
            #y_pred.extend(outputs.squeeze().cpu().numpy())
            predicted = torch.round(outputs)  # Use round instead of > 0.5
            correct_val += (predicted.squeeze() == y_batch).sum().item()
            total_val += y_batch.size(0)

    val_loss /= len(val_loader.dataset)

    # Calculate accuracies
    train_accuracy = correct_train / total_train
    val_accuracy = correct_val / total_val

    # Print accuracy
    #y_pred_binary = [round(y) for y in y_pred]
    #val_accuracy = accuracy_score(y_test_tensor.cpu(), y_pred_binary)

    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {running_loss / len(train_loader.dataset)}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")

    # Early stopping and checkpoint
    if val_loss < best_loss:
        best_loss = val_loss
        best_epoch = epoch+1
        torch.save(model.state_dict(), f"/home/CIN/lram2/BITCOIN_MASTER_DISSERTATION/30-minutes-bitcoin/classification_task_deep-pytorch/fully_conected_mlp/best_DCN_model_epoch.pth")
        print("Model saved!")
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered")
            break
print(f"best epoch: {best_epoch}")
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Define the simple DNN model
class SimpleDNN(nn.Module):
    def __init__(self, input_shape):
        super(SimpleDNN, self).__init__()
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(input_shape, 16)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(16, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.flatten(x)
        x = self.relu(self.fc1(x))
        x = self.sigmoid(self.fc2(x))
        return x

# Define the model
input_shape = X_train.shape[1] * X_train.shape[2]
model = SimpleDNN(input_shape).to(device)

# Step 3: Load the state dictionary
model.load_state_dict(torch.load("/home/CIN/lram2/BITCOIN_MASTER_DISSERTATION/30-minutes-bitcoin/classification_task_deep-pytorch/fully_conected_mlp/best_DCN_model_epoch.pth"))

# Step 4: Move the model to the appropriate device (if using GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Set the model to evaluation mode
model.eval()

# Initialize variables for loss and accuracy
test_loss = 0.0
correct = 0
total = 0

# Create DataLoader for the test set
test_dataset = TensorDataset(torch.tensor(X_test_scaled, dtype=torch.float32), torch.tensor(y_test, dtype=torch.float32))
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

criterion = nn.BCELoss()

with torch.no_grad():
    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), targets)
        test_loss += loss.item()

        predicted = (outputs > 0.5).float()
        correct += (predicted.squeeze() == targets).sum().item()
        total += targets.size(0)

# Calculate average loss and accuracy
average_test_loss = test_loss / len(test_loader)
test_accuracy = correct / total

print(f"Test loss: {average_test_loss:.4f}")
print(f"Test accuracy: {test_accuracy:.4f}")

# Set the model to evaluation mode
model.eval()

# Initialize list to store predictions
predictions = []

with torch.no_grad():
    for inputs, _ in test_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        predictions.append(outputs.cpu().numpy())

# Concatenate all predictions into a single array
pred_y = np.concatenate(predictions, axis=0)
pred_y_classes = (pred_y > 0.5).astype(int)  # Convert probabilities to binary predictions

print("Predictions on the test set (probs):", pred_y)
print("Predictions on the test set:", pred_y_classes)

from sklearn.metrics import classification_report, confusion_matrix
# Compute classification report
report = classification_report(y_test, pred_y_classes)
print("Classification Report:\n", report)

# Compute confusion matrix
cm = confusion_matrix(y_test, pred_y_classes)
print("Confusion Matrix:\n", cm)
X_test_scaled[30932].shape
import torch

# Let's assume your new instance is a numpy array with shape (30, 1)
new_instance = X_test_scaled[30945]  # This is your input data

# Convert the new instance to a PyTorch tensor and move it to the appropriate device (CPU/GPU)
new_instance_tensor = torch.tensor(new_instance, dtype=torch.float32).to(device)

# Add a batch dimension since the model expects input in the shape (batch_size, features)
new_instance_tensor = new_instance_tensor.unsqueeze(0)  # Shape changes from (30, 1) to (1, 30, 1)

# Set the model to evaluation mode
model.eval()

# Make the prediction
with torch.no_grad():  # Disable gradient calculation
    output = model(new_instance_tensor)

# The output is a probability, so apply rounding to get a binary prediction
prediction = output.item()

# Print the prediction
print(f"Prediction for the new instance: {prediction}")

explained_instance = x_values[c_indexes[27939]].reshape(1,-1,1)#indices do array com dispositivos eletronicos]
learning_process_name = 'Lime'
transform_name = 'uniform'

from TSInterpret.InterpretabilityModels.leftist.leftist import LEFTIST

model.eval()

leftist = LEFTIST(model.cpu(),(X_train,y_train),mode = "time", backend='PYT',
                  learning_process_name='Lime',transform_name=transform_name, explanation_size=10,nb_neighbors=1000)

label='0'

explanations2 = leftist.explain(explained_instance,0)

leftist.plot(np.array([explained_instance.reshape(-1,1)]),np.array([explanations2[0]]))
marreapato commented 3 months ago

All of my explanations appear like this image, where i have no feature importance associated with, however by using tensorflow i get different results:

image

JHoelli commented 3 months ago

Hi @marreapato ,

would you mind printing the explanation (explanations2) ? Just to figure out if the Algorithm implementation or the plot function is the issue.

Which version of TSInterpret are you using ?

marreapato commented 3 months ago

Thanks for the reply @JHoelli

image

I am using version 0.4.5 and the array of explanations appears as [array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])]

JHoelli commented 3 months ago

Hi @marreapato,

I think the issue is the output layer of your model. Most approaches in here expect a Softmax Ouput, i.e. two neurons for a binary classification problem.

I did some changes in the PyTorchWrapper of TSInterpret. Please try the changes by installing and let me know if it works:

 pip install https://github.com/fzi-forschungszentrum-informatik/TSInterpret/archive/refs/heads/main.zip

If that does not work, as fallback, retraining your model with softmaxoutput, two neurons and nn.Crossentropyloss might also be a solution.

marreapato commented 3 months ago

@JHoelli Thanks for your reply, i haven't even tried the first solution you provided me with (i can still do it though, in case you need some feedback). I switched my activation function of the output layer and the loss function, and i finally got the feature importance of the timesteps!

image

Btw i am using the library for my masters research, i am certainly going to cite the paper in my publications related with it, thanks for the effort in giving me a solution for my problem.

It's an applied research with financial time-series data, so if there's any interest in having your names attached with it, or in doing any sort of collaboration, let me know, my institutional email (From a Public University based in Brazil - Federal University of Pernambuco) is lram2@cin.ufpe.br

Off-topic:

Would you also happen to know any online events based on Explainability for time series?