deepcharles / ruptures

ruptures: change point detection in Python
BSD 2-Clause "Simplified" License
1.59k stars 162 forks source link

How can this method be used for data that is not piecewise stable? #189

Closed lupupu closed 3 years ago

lupupu commented 3 years ago

微信截图_20210723163335 As shown in the figure, this method fails.

lupupu commented 3 years ago

微信截图_20210723163743 The first piece is log function, and the second is constant.

oboulant commented 3 years ago

Hi,

Thanks for your interest in ruptures !

Could you share you code sample that deals with this ?

At first sight, what you are asking is a change point detection on mean shifts. In that regard, the graphs show that the method is working fine :

To be confirmed with your sample code.

Hope it helps !

deepcharles commented 3 years ago

For the first plot, you can use the model clinear (https://centre-borelli.github.io/ruptures-docs/user-guide/costs/costclinear/). It should work perfectly.

For the second case, you need a custom cost function, which fits on each segment either a constant or a log function.

Something like this (no tested)

from ruptures.base import BaseCost
import numpy as np
from numpy.polynomial.polynomial import Polynomial

class MyCost(BaseCost):

    """Custom cost"""

    # The 2 following attributes must be specified for compatibility.
    model = ""
    min_size = 2

    def fit(self, signal):
        """Set the internal parameter."""
        assert (signal.squeeze().ndim == 1), "Signal must be univariate."
        self.signal = signal.squeeze().reshape(-1, 1)
        return self

    def error(self, start, end):
        """Return the approximation cost on the segment [start:end].

        Args:
            start (int): start of the segment
            end (int): end of the segment

        Returns:
            float: segment cost
        """
        sub = self.signal[start:end].flatten()
        # constant fit
        constant_fit = (end - start) * sub.var()
        # log fit 
        _, (residual, *_) = Polynomial.fit(x=np.arange(start, end), y=np.exp(sub), deg=1, full=True)
        log_fit = residual[0]
        return min(constant_fit, log_fit)
lupupu commented 3 years ago

Hi,

Thanks for your interest in ruptures !

Could you share you code sample that deals with this ?

At first sight, what you are asking is a change point detection on mean shifts. In that regard, the graphs show that the method is working fine :

  • It does not separate the constant part of the signal because there is no mean shifts within the constant signal.
  • Depending on the shape of the non-constant section of the signal, the detection method distributes the change points so that signal points are the closest possible to the detected mean within computed segment.

To be confirmed with your sample code.

Hope it helps !

import matplotlib.pyplot as plt
import ruptures as rpt
import numpy as np

signal_log = 1 * np.log(np.arange(100)+1).reshape((-1, 1))
signal_slope = 0.07 * np.arange(100).reshape((-1, 1)) + 150
signal_constant_log = np.ones_like(signal_log)*np.max(signal_log)
signal_log_constant = np.r_[signal_log, np.ones_like(signal_log) * np.max(signal_log)]
signal_slope_constant = np.r_[signal_slope, np.ones_like(signal_slope) * np.max(signal_slope)]
bkps = [100, signal_log_constant.size]

# detection

kernel = "rbf"
params = {"gamma": 1e-1}
min_size = 10
pen = 0.1

result_log_constant = rpt.KernelCPD(kernel=kernel, params=params, min_size=min_size).fit_predict(
    signal_log_constant,
    pen=pen
)
result_slope_constant = rpt.KernelCPD(kernel=kernel, params=params, min_size=min_size).fit_predict(
    signal_slope_constant,
    pen=pen
)

# display
rpt.display(signal_log_constant, bkps, result_log_constant)
rpt.display(signal_slope_constant, bkps, result_slope_constant)
plt.show()

Thank you!

lupupu commented 3 years ago

For the first plot, you can use the model clinear (https://centre-borelli.github.io/ruptures-docs/user-guide/costs/costclinear/). It should work perfectly.

For the second case, you need a custom cost function, which fits on each segment either a constant or a log function.

Something like this (no tested)

from ruptures.base import BaseCost
import numpy as np
from numpy.polynomial.polynomial import Polynomial

class MyCost(BaseCost):

    """Custom cost"""

    # The 2 following attributes must be specified for compatibility.
    model = ""
    min_size = 2

    def fit(self, signal):
        """Set the internal parameter."""
        assert (signal.squeeze().ndim == 1), "Signal must be univariate."
        self.signal = signal.squeeze().reshape(-1, 1)
        return self

    def error(self, start, end):
        """Return the approximation cost on the segment [start:end].

        Args:
            start (int): start of the segment
            end (int): end of the segment

        Returns:
            float: segment cost
        """
        sub = self.signal[start:end].flatten()
        # constant fit
        constant_fit = (end - start) * sub.var()
        # log fit 
        _, (residual, *_) = Polynomial.fit(x=np.arange(start, end), y=np.exp(sub), deg=1, full=True)
        log_fit = residual[0]
        return min(constant_fit, log_fit)

Thank you very much for your reply, I will test the effect.

deepcharles commented 3 years ago

Closing now. Feel free to reopen.

lupupu commented 2 years ago

For the first plot, you can use the model clinear (https://centre-borelli.github.io/ruptures-docs/user-guide/costs/costclinear/). It should work perfectly.

For the second case, you need a custom cost function, which fits on each segment either a constant or a log function.

Something like this (no tested)

from ruptures.base import BaseCost
import numpy as np
from numpy.polynomial.polynomial import Polynomial

class MyCost(BaseCost):

    """Custom cost"""

    # The 2 following attributes must be specified for compatibility.
    model = ""
    min_size = 2

    def fit(self, signal):
        """Set the internal parameter."""
        assert (signal.squeeze().ndim == 1), "Signal must be univariate."
        self.signal = signal.squeeze().reshape(-1, 1)
        return self

    def error(self, start, end):
        """Return the approximation cost on the segment [start:end].

        Args:
            start (int): start of the segment
            end (int): end of the segment

        Returns:
            float: segment cost
        """
        sub = self.signal[start:end].flatten()
        # constant fit
        constant_fit = (end - start) * sub.var()
        # log fit 
        _, (residual, *_) = Polynomial.fit(x=np.arange(start, end), y=np.exp(sub), deg=1, full=True)
        log_fit = residual[0]
        return min(constant_fit, log_fit)

Thanks a lot! I found that using rpt.Dynp(model="clinear") works well for the first problem. However, Dynp requires n_bkps. Is there a way to solve the first problem but not require n_bkps?

oboulant commented 2 years ago

Hi @lupupu ,

Pelt should do the trick. Now, using Pelt, you do not have to provide n_bkps but you still have to provide a value for the penalty term pen.

import numpy as np
import ruptures as rpt

# Generate data
data = 3*np.arange(100)
data = np.append(data, 300*np.ones((100,)))

# Use ruptures
algo = rpt.Pelt(model="clinear", min_size=1, jump=1).fit(data.reshape(-1, 1))
algo.predict(pen=1)

This works fine for me and returns [101, 200].

Hope it helps !

lupupu commented 2 years ago

Hi @lupupu ,

Pelt should do the trick. Now, using Pelt, you do not have to provide n_bkps but you still have to provide a value for the penalty term pen.

import numpy as np
import ruptures as rpt

# Generate data
data = 3*np.arange(100)
data = np.append(data, 300*np.ones((100,)))

# Use ruptures
algo = rpt.Pelt(model="clinear", min_size=1, jump=1).fit(data.reshape(-1, 1))
algo.predict(pen=1)

This works fine for me and returns [101, 200].

Hope it helps !

Thank you for your prompt. I didn't find the above content in the document, so can I try to improve the document? But I don't have time yet. i 'm sorry.