rlabbe / filterpy

Python Kalman filtering and optimal estimation library. Implements Kalman filter, particle filter, Extended Kalman filter, Unscented Kalman filter, g-h (alpha-beta), least squares, H Infinity, smoothers, and more. Has companion book 'Kalman and Bayesian Filters in Python'.
MIT License
3.31k stars 617 forks source link

Filtering with variable time steps #196

Closed RobertRol closed 4 years ago

RobertRol commented 4 years ago

Hi,

Thanks a lot for your great python package. I was wondering how to implement filtering with an irregular influx of measurement data, i.e., assume that the data does not come in on a regular time grid but rather at "random" times.

Below you can find a minimal example and my attempt to solve this problem. The minimal example is based on the code snippet from the UnscentedKalmanFilter documentation. As filterPy does not provide an interface to change the time step _dt of KalmanFilter objects, I am not sure whether my approach is correct. I am manually overriding the _dt member of the UnscentedKalmanFilter object and scaling Q accordingly.

Thanks for your help, Robert

# adapted from ...
# ... https://filterpy.readthedocs.io/en/latest/kalman/UnscentedKalmanFilter.html

import filterpy.kalman as filterKF
import numpy as np

def fx(x, dt):
    # state transition function - predict next state based
    # on constant velocity model x = vt + x_0
    F = np.array([[1, dt, 0, 0],
                  [0, 1, 0, 0],
                  [0, 0, 1, dt],
                  [0, 0, 0, 1]], dtype=float)
    return np.dot(F, x)

def hx(x):
    # measurement function - convert state into a measurement
    # where measurements are [x_pos, y_pos]
    return np.array([x[0], x[2]])

dt = 0.1
# create sigma points to use in the filter. This is standard for Gaussian processes
points = filterKF.MerweScaledSigmaPoints(4, alpha=.1, beta=2., kappa=-1)
kf = filterKF.UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt,
                                    fx=fx, hx=hx, points=points)
kf.x = np.array([-1., 1., -1., 1])  # initial state
kf.P *= 0.2  # initial uncertainty
z_std = 0.1
kf.R = np.diag([z_std ** 2, z_std ** 2])  # 1 standard
myQ = np.diag([(dt * 0.01) ** 2] * 4)
kf.Q = myQ.copy()

# measurements: assumed to be on a regular time grid [0, dt, 2 * dt, ...]
zs = [[i + np.random.randn() * z_std,
       i + np.random.randn() * z_std] for i in range(50)]
t = 0
tOld = 0

for z in zs:
    t += dt
    # simulate irregular influx of measurement data, i.e., ...
    # ... skip a few observations from zs
    if np.random.uniform() < 0.5:
        deltaT = t - tOld
        # override _dt
        kf._dt = deltaT
        # scale Q
        kf.Q = myQ * (deltaT / dt) ** 2
        kf.predict()
        kf.update(z)
        tOld = t
        print('t: {}, deltaT: {}, kf.x: {}'.format(round(t, 1),
                                                   round(deltaT, 1),
                                                   kf.x))
rlabbe commented 4 years ago

Not quite correct. You also need to pass dt into predict, or fx(x, dt) will return the wrong predicted estimate.

rlabbe commented 4 years ago

I'm closing, but of course reply if you have further questions.