nasa-jpl / fastcat

C++ EtherCAT Device Command & Control Library
Other
42 stars 11 forks source link

Add interpolation for CSP commands #118

Closed kwehage closed 8 months ago

kwehage commented 1 year ago

Add support for interpolation of CSP commands:

TODO:

See also discussion in https://github.com/nasa-jpl/fastcat/pull/117

kwehage commented 1 year ago

Here is a python simulation that demonstrates what the expected behavior of the interpolation algorithm would be on hardware, accounting for latency and jitter.

image

import numpy as np
import matplotlib.pyplot as plt

class InterpDemo():
    def __init__(self, t_initial=0.0, t_final=0.055, p_initial=0.0, dt=1.e-9,
                 velocity=0.2, jitter_input_sigma=1.e-6, jitter_output_sigma=1.e-6,
                 input_hz=256, output_hz=1000):
        self.t_initial_ = t_initial
        self.t_final_ = t_final
        self.p_initial_ = p_initial
        self.dt_ = dt
        self.velocity_ = velocity
        self.jitter_input_sigma_ = jitter_input_sigma
        self.jitter_output_sigma_ = jitter_output_sigma
        self.input_hz_ = input_hz
        self.output_hz_ = output_hz
        self.input_loop_period_ = 1.0 / self.input_hz_
        self.output_loop_period_ = 1.0 / self.output_hz_

        self.input_current_cycle_ = 0
        self.output_current_cycle_ = 0
        self.t_ = self.t_initial_
        self.input_profile_ = {"t": [], "position": [], "jitter": []}
        self.output_profile_ = {"t": [], "position": [], "jitter": []}
        self.input_wake_time_ = self.t_initial_
        self.output_wake_time_ = self.t_initial_
        self.csp_command_ = None

    def get_position_update(self):
        return self.p_initial_ + self.t_ * self.velocity_

    def run_sim(self):
        while self.t_ <= self.t_final_:
            if(self.t_ >= self.input_wake_time_):
                self.process_input()
            if(self.t_ >= self.output_wake_time_):
                self.process_output()
            self.t_ += self.dt_

    def process_input(self):
        print(f"Processing input at t={self.t_}")
        p = self.get_position_update()
        self.csp_command_ = {
            "request_time": self.t_,
            "position": p,
        }
        self.csp_command_initiated_ = True

        self.input_profile_["t"].append(self.t_)
        self.input_profile_["position"].append(p)

        jitter = np.random.default_rng().normal(0.0, self.jitter_input_sigma_)
        self.input_profile_["jitter"].append(jitter)

        self.input_current_cycle_ += 1
        self.input_wake_time_ = (
            self.input_current_cycle_ * self.input_loop_period_ + jitter
        )

    def process_output(self):
        print(f"Processing output at t={self.t_}")

        if not self.csp_command_:
            return

        dt = self.t_ - self.csp_command_["request_time"]
        self.output_profile_["position"].append(
            self.csp_command_["position"] + dt * self.velocity_
        )
        self.output_profile_["t"].append(self.t_)

        jitter = np.random.default_rng().normal(0.0, self.jitter_output_sigma_)
        self.output_profile_["jitter"].append(jitter)

        self.output_current_cycle_ += 1
        self.output_wake_time_ = (
            self.output_current_cycle_ * self.output_loop_period_ + jitter
        )

    def plot(self):

        fig, ax = plt.subplots(2)
        fig.suptitle('Interpolation demo')
        ax[0].step(
            self.input_profile_["t"],
            self.input_profile_["position"],
            'r', where='post', label='input setpoints'
        )
        ax[0].step(
            self.output_profile_["t"],
            self.output_profile_["position"],
            'b', where='post', label='output setpoints'
        )

        t_span = self.t_final_ - self.t_initial_
        ax[0].plot(
            [self.t_initial_, self.t_final_],
            [self.p_initial_, self.p_initial_ + t_span * self.velocity_],
            'b--', alpha=0.4
        )
        ax[0].set(xlabel="time (s)", ylabel="position (eu)")
        ax[0].grid()
        ax[0].legend()

        ax[1].plot(
            self.input_profile_["t"],
            self.input_profile_["jitter"],
            "r", label="input jitter"
        )
        ax[1].plot(
            self.output_profile_["t"],
            self.output_profile_["jitter"],
            "b", label="output jitter"
        )
        ax[1].set(xlabel="time (s)", ylabel="jitter (s)")
        ax[1].grid()
        ax[1].legend()
        plt.show()

if __name__ == "__main__":
    demo = InterpDemo()
    demo.run_sim()
    demo.plot()
alex-brinkman commented 1 year ago

Kris I think there might be a better way of injecting this time change rather than modifying the inputs to the ConfigFromYaml method - I'll try to write down some ideas soon!

alex-brinkman commented 1 year ago

General Comments:

kwehage commented 1 year ago

I added a SetInitializationTime() method which is called before ConfigFromYaml(). There is a default implementation which sets the initial time in seconds and monotonic time in seconds. It's possible to override that method to set any additional member variables (i.e. last_transition_time_ or put additional initialization logic in the ConfigFromYaml() method (which is safe to do because SetInitializationTime() would always be called first.

d-loret commented 1 year ago

@kwehage, could you give me more context on the motivation for trying to do interpolation with CSP commands? I would like to understand the rationale to have a better basis for an in-depth review.