Basic Python particle filter. Plain SIR filtering, with various resampling algorithms. Written to be simple and clear; not necessarily most efficient or most flexible implementation. Depends on NumPy only.
This repo is useful for understanding how a particle filter works, or a quick way to develop a custom filter of your own from a relatively simple codebase.
There are more mature and sophisticated packages for probabilistic filtering in Python (especially for Kalman filtering) if you want an off-the-shelf solution:
Available via PyPI:
pip install pfilter
Or install the git version:
pip install git+https://github.com/johnhw/pfilter.git
Create a ParticleFilter
object, then call update(observation)
with an observation array to update the state of the particle filter.
Calling update()
without an observation will update the model without any data, i.e. perform a prediction step only.
d
dimensionsh
dimensionsn
particles estimating state in each time stepParticles are represented as an (n,d)
matrix of states, one state per row. Observations are generated from this matrix into an (n,h)
matrix of hypothesized observations via the observation function.
You need to specify at the minimum:
observe_fn(state (n,d)) => observation matrix (n,h)
which will return a predicted observation for an internal state.prior_fn => state matrix (n,d)
for all of the internal state variables. These are usually distributions from scipy.stats
. The utility function independent_sample
makes it easy to concatenate sampling functions to sample the whole state vector.weight_fn(hyp_observed (n,h), real_observed (h,)) => weight vector (n,)
which specifies how well each of the hyp_observed
arrays match the real observation real_observed
. This must produce a strictly positive weight value for each hypothesized observation, where larger means more similar. This is often an RBF kernel or similar.Typically, you would also specify:
dynamics_fn(state (n,d)) => predicted_state (n,d)
to update the state based on internal (forward prediction) dynamics, and a noise_fn(predicted_state (n,d)) => noisy_state (n,d)
to add diffusion into the sampling process (though you could also merge into the dynamics). You might also specify:
internal_weight_fn(state (n,d)) => weight vector (n,)
which provides a weighting to apply on top of the weight function based on internal state. This is useful to impose penalties or to include learned inverse models in the inference.transform_fn(state (n,d), weights (n,)) => states(n, k)
which can apply a post-processing transform and store the result in transformed_particles
If you want to be able to deal with partial missing values in the observations, the weight function should support masked arrays. The squared_error(a,b)
function in pfilter.py
does this, for example.
Sometimes it is useful to pass inputs to callback functions like dynamics_fn(x)
at each time step. You can do this by giving keyword arguments to update()
.
If you call pf.update(y, t=5)
all of the functions dynamics_fn, weight_fn, noise_fn, internal_weight_fn, observe_fn
will receive the keyword argument t=5
. ALl kwargs
are forwarded to these calls. You can just ignore them if not used (e.g. define dynamics_fn = lambda x, **kwargs: real_dynamics(x)
) but this can be useful for propagating inputs that are neither internal states nor observed states to the filter. If no kwargs
are given to update
, then no extra arguments are passed to any of callbacks.
The ParticleFilter
object will have the following useful attributes after updating:
original_particles
the (n,d)
collection of particles in the last update stepmean_state
the (d,)
expectation of the statemean_hypothesized
the (h,)
expectation of the hypothesized observationscov_state
the (d,d)
covariance matrix of the statemap_state
the (d,)
most likely statemap_hypothesized
the (h,)
most likely hypothesized observationweights
the (n,)
normalised weights of each particleFor example, assuming we observe 32x32 images and want to track a moving circle. Assume the internal state we are estimating is the 4D vector (x, y, dx, dy), with 200 particles
from pfilter import ParticleFilter, gaussian_noise, squared_error, independent_sample
columns = ["x", "y", "radius", "dx", "dy"]
from scipy.stats import norm, gamma, uniform
# prior sampling function for each variable
# (assumes x and y are coordinates in the range 0-32)
prior_fn = independent_sample([uniform(loc=0, scale=32).rvs,
uniform(loc=0, scale=32).rvs,
gamma(a=2,loc=0,scale=10).rvs,
norm(loc=0, scale=0.5).rvs,
norm(loc=0, scale=0.5).rvs])
# very simple linear dynamics: x += dx
def velocity(x):
xp = np.array(x)
xp[0:2] += xp[3:5]
return xp
# create the filter
pf = pfilter.ParticleFilter(
prior_fn=prior_fn,
observe_fn=blob,
n_particles=200,
dynamics_fn=velocity,
noise_fn=lambda x:
gaussian_noise(x, sigmas=[0.2, 0.2, 0.1, 0.05, 0.05]),
weight_fn=lambda x,y:squared_error(x, y, sigma=2),
resample_proportion=0.1,
column_names = columns)
# assuming image of the same dimensions/type as blob will produce
pf.update(image)
blob
(200, 4) -> (200, 1024) which draws a blob on an image of size 32x32 (1024 pixels) for each internal state, our observation functionvelocity
(200, 4) -> (200, 4), our dynamics function, which just applies a single Euler step integrating the velocityprior_fn
which generates a (200,4) initial random stategaussian_noise
(200, 4) -> (200,4) which adds noise to the internal statesquared_error
((200,1024), (1024,)) -> (200,) the similarity measurementSee the notebook examples/example_filter.py for a working example using skimage
and OpenCV
which tracks a moving white circle.