deepcharles / ruptures

ruptures: change point detection in Python
BSD 2-Clause "Simplified" License
1.56k stars 161 forks source link

Accelerate ruptures with numba #230

Closed Illviljan closed 2 years ago

Illviljan commented 2 years ago

ruptures uses a lot of (nested) for loops to detect change points and for loops in python is slow.

The usual way of handling that is to vectorize as much as possible using numpy. But the algorithms in ruptures are quite complex with a lot of dependencies on previous loops so it isn't that easy (for me at least) to vectorize it more than it already is.

Now enter numba which compiles python code to fast machine code with (almost) a single decorator, so in Pelt it would simply look like this:

from numba import njit

@njit
def _seg(...):
        for bkp in ind:
            ...
            for t in admissible:
                ....

Now as always it isn't quite that simple. The decorator works best if the functions are pretty much pure math/numpy only code. So no dicts, classes or other advanced python stuff. This usually just means that the heavy working functions need to have all variables in the args instead of hidden in self.

Nice intro on numba: https://youtu.be/ewaY9CcjLt0?t=59

Here's some WIP testing code where I've gotten significantly faster results with numba:

```python import pandas as pd import numpy as np import matplotlib.pyplot as plt from numba import njit, float32, float64, int32, int64 import ruptures def pelt(data, **kwargs): # Pre-processing df = pd.DataFrame(data) df['squared'] = np.square(df[0]) df['cumsum'] = np.cumsum(df[0], axis=0) df['cumsumsquared'] = np.cumsum(df['squared'], axis=0) df['diviseur'] = [x for x in range(1,len(df)+1)] df['mean'] = df['cumsum'] / df['diviseur'] df['meansquared'] = np.square(df['mean']) df = df.append({ 0:0, 'cumsum':0, 'cumsumsquared':0, 'diviseur':0, 'mean':0, 'meansquared':0, 'squared':0}, ignore_index=True) # Penalty if 'penalty' in kwargs: B = kwargs['penalty'] else: B = 2 * np.log(len(data)) # Initilization Q = [-B] # Actual cost CP = [-1] # Last segment position T = [x for x in range(0,len(data))] # Authorized positions # Parse the data for pos in range(0,len(data)): costs = [] min_cost_val_temp = float("inf") min_cost_pos_temp = -1 # Parse all the Yi:pos that are still available for i in T: if i > pos: break # Square sum minus N times the square mean sos = df['cumsumsquared'].iloc[pos] - df['cumsumsquared'].iloc[i-1] n = pos - i + 1 ms = (data[i:pos+1].mean())**2 C = sos - (n*ms) # Cost test temp_cost = Q[i] + C + B if min_cost_val_temp > temp_cost: min_cost_val_temp = temp_cost min_cost_val_pos = i # Push the smallest cost Q.append(min_cost_val_temp) # Push the position CP.append(min_cost_val_pos) # Prunning for i in T: if i >= pos: break iplusone = i+1 # Square sum minus N times the square mean sos = df['cumsumsquared'].iloc[pos] - df['cumsumsquared'].iloc[iplusone-1] n = pos - iplusone + 1 ms = (data[iplusone:pos+1].mean())**2 C = sos - (n*ms) if (Q[i] + C > Q[pos]): T.remove(i) return CP def backtracking(CP): """ Apply backtracking to a CP vector from OP algorithm. Returns a "segments" vector. Args: CP: array-like 1 dimension. """ # Data length n = len(CP)-1 # Initialization segments = [] changepoint = CP[n] # While the changepoint doesn't return the first point while changepoint > 0: segments.append(changepoint-1) changepoint = CP[changepoint] # The new vector was built with .append(), but since we parse from the end to the beginning, # We need to reverse it. segments.reverse() return segments def plot_segments(data, segments, ylim=False): """ Plot segments generated by the OP & backtracking algorithms. Args: data: the data used to fit the model. segments: the segments returned by backtracking(). """ fig, ax = plt.subplots() start = 0 index = np.arange(len(data)) plt.plot(index, data, ".", color="k") for i, v in enumerate([0] + segments[:-1]): start = v end = segments[i+1]+1 print(start, end) mean = np.mean(data[start:end]) plt.plot((start, end), (mean, mean)) if ylim != False: plt.ylim(ylim) plt.show() @njit def calc(data, cumsumsquared, end, start): # Square sum minus N times the square mean sos = cumsumsquared[end] - cumsumsquared[start-1] n = end - start + 1 data_mean = np.mean(data[start:end+1]) data_mean_squared = data_mean * data_mean C = sos - n*data_mean_squared return C @njit() def pelt2(_data, penalty=None): # Initialize: n_samples = _data.size+1 n_samplesplusone = n_samples + 1 data = np.empty(n_samples) data[:-1] = _data data[-1] = 0 cumsumsquared = np.cumsum(data*data) cumsumsquared[-1] = 0 # Penalty B = penalty if penalty is not None else 2 * np.log(n_samples) Q = np.empty(n_samples, dtype=np.float_) Q[0] = -B # Last segment position: group_idx = np.empty(n_samples, dtype=np.int_) group_idx[0] = -1 admissible = [x for x in range(0, n_samples)] # Authorized positions # mask = np.ones(le, dtype=np.bool_) ind = np.arange(0, n_samples-1) # ind = [k for k in range(0, self.n_samples, self.jump) if k >= self.min_size] # ind += [self.n_samples] # Parse the data for bkp in ind: min_cost_val_temp = np.inf # Parse all the Yi:bkp that are still available for t in admissible: if t > bkp: break C = calc(data, cumsumsquared, bkp, t) # Cost test temp_cost = Q[t] + C + B if min_cost_val_temp > temp_cost: min_cost_val_temp = temp_cost min_cost_val_pos = t # Push the smallest cost Q[bkp+1] = min_cost_val_temp # Push the position group_idx[bkp+1] = min_cost_val_pos # Prunning for t in admissible: if t >= bkp: break C = calc(data, cumsumsquared, bkp, t+1) if Q[t] + C > Q[bkp]: # mask[t] = False admissible.remove(t) return group_idx[1:] def plot_group_idx(data, group_idx): elements = np.arange(0, data.size) fig, ax = plt.subplots(1, 1) for g in np.unique(group_idx): mask = group_idx == g x = elements[mask] y = data[mask] ax.plot(x, y, ".", color="k") y_mean = y*0 + y.mean() ax.plot(x, y_mean) r"""Pelt""" from ruptures.costs import cost_factory from ruptures.base import BaseCost, BaseEstimator from ruptures.exceptions import BadSegmentationParameters from ruptures.utils import sanity_check class Pelt(BaseEstimator): """Penalized change point detection. For a given model and penalty level, computes the segmentation which minimizes the constrained sum of approximation errors. """ def __init__(self, model="l2", custom_cost=None, min_size=2, jump=5, params=None): """Initialize a Pelt instance. Args: model (str, optional): segment model, ["l1", "l2", "rbf"]. Not used if ``'custom_cost'`` is not None. custom_cost (BaseCost, optional): custom cost function. Defaults to None. min_size (int, optional): minimum segment length. jump (int, optional): subsample (one every *jump* points). params (dict, optional): a dictionary of parameters for the cost instance. """ if custom_cost is not None and isinstance(custom_cost, BaseCost): self.cost = custom_cost else: if params is None: self.cost = cost_factory(model=model) else: self.cost = cost_factory(model=model, **params) self.min_size = max(min_size, self.cost.min_size) self.jump = jump self.n_samples = None def _seg(self, pen): """Computes the segmentation for a given penalty using PELT (or a list of penalties). Args: penalty (float): penalty value Returns: dict: partition dict {(start, end): cost value,...} """ # initialization # partitions[t] contains the optimal partition of signal[0:t] partitions = dict() # this dict will be recursively filled partitions[0] = {(0, 0): 0} group_idx = np.zeros_like(self.signal) # Recursion ind = [k for k in range(0, self.n_samples, self.jump) if k >= self.min_size] ind += [self.n_samples] admissible = [] for bkp in ind: # adding a point to the admissible set from the previous loop. new_adm_pt = (bkp - self.min_size) // self.jump new_adm_pt *= self.jump admissible.append(new_adm_pt) C = np.inf subproblems = list() for t in admissible: if t > bkp: break # # Cost test # temp_cost = Q[i] + C + B # if min_cost_val_temp > temp_cost: # min_cost_val_temp = temp_cost # min_cost_val_pos = i # left partition try: tmp_partition = partitions[t].copy() except KeyError: # no partition of 0:t exists continue # we update with the right partition C_c = self.cost.error(t, bkp) + pen if C_c < C: C = C_c min_cost_val_pos = t # tmp_partition.update({(t, bkp): self.cost.error(t, bkp) + pen}) # subproblems.append(tmp_partition) group_idx[bkp+1] = t # finding the optimal partition partitions[bkp] = min(subproblems, key=lambda d: sum(d.values())) # trimming the admissible set admissible = [ t for t, partition in zip(admissible, subproblems) if sum(partition.values()) <= sum(partitions[bkp].values()) + pen ] best_partition = partitions[self.n_samples] del best_partition[(0, 0)] return best_partition def fit(self, signal) -> "Pelt": """Set params. Args: signal (array): signal to segment. Shape (n_samples, n_features) or (n_samples,). Returns: self """ # update params self.cost.fit(signal) if signal.ndim == 1: (n_samples,) = signal.shape else: n_samples, _ = signal.shape self.n_samples = n_samples return self def predict(self, pen): """Return the optimal breakpoints. Must be called after the fit method. The breakpoints are associated with the signal passed to [`fit()`][ruptures.detection.pelt.Pelt.fit]. Args: pen (float): penalty value (>0) Raises: BadSegmentationParameters: in case of impossible segmentation configuration Returns: list: sorted list of breakpoints """ # raise an exception in case of impossible segmentation configuration if not sanity_check( n_samples=self.cost.signal.shape[0], n_bkps=0, jump=self.jump, min_size=self.min_size, ): raise BadSegmentationParameters partition = self._seg(pen) bkps = sorted(e for s, e in partition.keys()) return bkps def fit_predict(self, signal, pen): """Fit to the signal and return the optimal breakpoints. Helper method to call fit and predict once Args: signal (array): signal. Shape (n_samples, n_features) or (n_samples,). pen (float): penalty value (>0) Returns: list: sorted list of breakpoints """ self.fit(signal) return self.predict(pen) # %% reps = 1 repeats=10 y = np.tile(np.repeat(np.array([72, 72, 100, 100, 300, 300, 500, 500]), repeats), reps) z = np.tile(np.array([1, 2, 1, 2, 1, 2, 1, 2]), reps) noise = lambda y: 1 + 0.1 * (np.random.rand(*y.shape) - 0.5) y = y * noise(y) penalty = 600.0 # cpexpected = pelt(y, penalty=penalty) cpactual = pelt2(y, penalty=penalty) cprupt = Pelt("l2", min_size=0, jump=1).fit_predict(y, pen=penalty) # print(cpexpected) print(cpactual) # print(cprupt) # assert all(cpexpected == cpactual) # segments = backtracking(cpexpected) # print(segments) # plot_segments(y, segments) group_idx = plot_group_idx(y, cpactual) ```
oboulant commented 2 years ago

Hi @Illviljan ,

Thanks for your interest in ruptures !

Please feel free to propose a pull request implementing this either with numba or Cython. In that case, it would be great to have also a comparison of performance with native python algorithms implemented in ruptures.

As for an accelerated version of pelt, if you are using a cost functions that can be mapped to kernel functions (see here for available kernels in ruptures and related cost functions), you can use the already implemented Kernel Change Point Detection methods :

oboulant commented 2 years ago

Hi @Illviljan Since this is rather a discussion than an issue, I am closing this. Please, feel free to open a discussion thread on this topic in the Discussion section of the repo. Thx !

deepcharles commented 2 years ago

Thanks for this remark @Illviljan.

Numba accelerations are not particularly suited for change-point detection procedures, as the underlying algorithms (dynamic programming) is basically 2 nested for loops, which cannot be vectorized. For a few cost functions (in your code, you tried l2), the inner for loop can be vectorized, but that cannot be generalized. That is why we choose to implemenent the most used methods in C (where for loops are fast). Maybe you can compare your implementation to the KernelCPD one.