vsubbian / WindowSHAP

A model-agnostic framework for explaining time-series classifiers using Shapley values
15 stars 8 forks source link

for Regression #3

Open GabrielWeisser96 opened 3 weeks ago

GabrielWeisser96 commented 3 weeks ago

Is it possible to change it to a Regression problem X [samples, times-series, features] Y [samples, outputs] ? Thnak you!

aminnayebi commented 3 weeks ago

I do not understand your question. Can you clarify more? If you are asking about the applicabality of our method, you can apply WindowSHAP to any time series classifier as long as it's inputs are 3 dimensional.

GabrielWeisser96 commented 3 weeks ago

Thank you for your response. To clarify my question:

Is it possible to apply WindowSHAP to a regression problem where the output Y is not in the form [sample,] but rather [samples, outputs]?

I want to ensure that I can use WindowSHAP for a model that produces multiple output values per sample.

Thank you in advance!

aminnayebi commented 2 weeks ago

Now I understand what you mean. First of all, Dynamic WindowSHAP cannot be implemented for multi-output models, because it uses that shap values to determine the length of windows, and if we have multiple outputs, we cannot use all of them for the windowing purposes. However, for the other two methods, it is possible to generate the shap values for all the outputs, however in the current implementation, it only takes the first output into consideration and generates the shap values for that. You just need to make some minor changes to the code to make it able to run for multi output problems. Let me know if you need help

GabrielWeisser96 commented 1 week ago

I have tried my best but it doesn't really work. One solution would also be to simply handle the declaration for each issue separately.

class StationaryWindowSHAP(): def init(self, model, window_len, B_ts, test_ts, B_mask=None, B_dem=None, test_mask=None, test_dem=None, model_type='lstm'): self.model = model self.window_len = window_len self.num_window = np.ceil(B_ts.shape[1] / self.window_len).astype('int') self.num_background = len(B_ts) self.num_test = len(test_ts) self.background_ts = B_ts self.background_mask = B_mask self.background_dem = B_dem self.test_ts = test_ts self.test_mask = test_mask self.test_dem = test_dem self.model_type = model_type self.ts_phi = None self.dem_phi = None self.explainer = None

    self.num_ts_ftr = B_ts.shape[2]
    self.num_ts_step = B_ts.shape[1]
    self.num_dem_ftr = 0 if B_dem is None else B_dem.shape[1]

    self.all_ts = np.concatenate((self.background_ts, self.test_ts), axis=0)
    self.all_mask = None if test_mask is None else np.concatenate((self.background_mask, self.test_mask), axis=0)
    self.all_dem = None if test_dem is None else np.concatenate((self.background_dem, self.test_dem), axis=0)

    self.background_data = self.data_prepare(ts_x=self.background_ts, dem_x=self.background_dem, start_idx=0)
    self.test_data = self.data_prepare(ts_x=self.test_ts, dem_x=self.test_dem, start_idx=self.num_background)

def data_prepare(self, ts_x, dem_x=None, start_idx=0):
    assert len(ts_x.shape) == 3
    assert dem_x is None or len(dem_x.shape) == 2
    dem_len = 0 if dem_x is None else dem_x.shape[1]

    total_num_features = self.num_dem_ftr + self.num_ts_ftr * self.num_window

    x_ = [[i] * total_num_features for i in range(start_idx, start_idx + ts_x.shape[0])]

    return np.array(x_)

def wraper_predict(self, x):
    assert len(x.shape) == 2

    dem_x, ts_x = x[:, :self.num_dem_ftr].copy(), x[:, self.num_dem_ftr:].copy()

    ts_x_ = np.zeros((x.shape[0], self.all_ts.shape[1], self.all_ts.shape[2]))
    mask_x_ = np.zeros_like(ts_x_)
    dem_x_ = np.zeros_like(dem_x, dtype=float)
    tstep = np.ones((x.shape[0], self.all_ts.shape[1], 1)) * \
                np.reshape(np.arange(0, self.all_ts.shape[1]), (1, self.all_ts.shape[1], 1))

    ts_x = ts_x.reshape((ts_x.shape[0], self.num_window, self.num_ts_ftr))

    for i in range(x.shape[0]):
        for t in range(self.num_ts_step):
            for j in range(self.num_ts_ftr):
                wind_t = np.ceil((t + 1) / self.window_len).astype('int') - 1
                ind = ts_x[i, wind_t, j]
                ts_x_[i, t, j] = self.all_ts[ind, t, j]
                mask_x_[i, t, j] = None if self.all_mask is None else self.all_mask[ind, t, j]
        for j in range(dem_x.shape[1]):
            ind = dem_x[i, j]
            dem_x_[i, j] = None if self.all_dem is None else self.all_dem[ind, j]

    if self.model_type == 'lstm_dem':
        model_input = [ts_x_, dem_x_]
    elif self.model_type == 'grud':
        model_input = [ts_x_, mask_x_, tstep]
    elif self.model_type == 'lstm':
        model_input = ts_x_

    return self.model.predict(model_input)

def shap_values(self, num_output=1):
    self.explainer = shap.KernelExplainer(self.wraper_predict, self.background_data)
    shap_values = self.explainer.shap_values(self.test_data)
    shap_values = np.array(shap_values)

    self.dem_phi = shap_values[:, :, :self.num_dem_ftr]
    ts_shap_values = shap_values[:, :, self.num_dem_ftr:]

    # Überprüfen Sie die Dimensionen von ts_shap_values
    print(f"ts_shap_values shape: {ts_shap_values.shape}")

    num_samples, num_segments, num_targets = ts_shap_values.shape

    # Korrigieren der erwarteten Form
    expected_shape = (self.num_test, self.num_window, self.num_ts_ftr, num_targets)

    if num_segments != self.num_window * self.num_ts_ftr:
        raise ValueError(f"Unexpected shape for ts_shap_values: {ts_shap_values.shape}. Expected {expected_shape}")

    # Anpassen der Umformung der Dimensionen
    self.ts_phi = ts_shap_values.reshape((num_samples, self.num_window, self.num_ts_ftr, num_targets))

    # Berechnen der endgültigen Shapley-Werte
    self.ts_phi = np.repeat(self.ts_phi / self.window_len, self.window_len, axis=1)[:, :self.num_ts_step, :, :]

    self.dem_phi = self.dem_phi[0]

    return self.ts_phi if self.num_dem_ftr == 0 else (self.dem_phi, self.ts_phi)