fgmacedo / python-statemachine

Python Finite State Machines made easy.
MIT License
846 stars 83 forks source link

Send a state machine over MQTT / serializing-deserializing a SM #421

Closed scls19fr closed 4 months ago

scls19fr commented 6 months ago

Hello,

I'm currently using python-statemachine with Webots (a robotic simulator). In my application I send sensors data over MQTT to an home made telemetry app (Python PyQtGraph). I wonder if (and probably more exactly how) send over MQTT state machine. I don't want to copy paste code of bot controller to my telemetry app but'd like to use a serializing-deserializing protocol (JSON, MessagePack) to send state machine. On telemetry side statemachine will only be used to display what state is active. Any help to achieve that will be great.

Kind regards

fgmacedo commented 6 months ago

Hi @scls19fr , how are you?

You only need to serialize the value of the current state, right?

Maybe you can use the current state value: https://python-statemachine.readthedocs.io/en/latest/api.html#statemachine.statemachine.StateMachine.current_state_value

Let me know if this helps, if not, please give an example of how you want to get the desired outcome.

Best!

scls19fr commented 6 months ago

Hi @fgmacedo ,

I'm fine and hope you are well also. This may be useful to bypass things on telemetry app side but how to transmit through MQTT the state machine ie states names, edges ... to build a new "fake" sm (essentially draw it) on telemetry app side?

scls19fr commented 6 months ago

What I mean is that sending only current active state through MQTT is not enough to draw the sm on telemetry app side. You need to have a whole "copy" of the sm which is currently running on target (bot) side.

fgmacedo commented 6 months ago

Can you provide me with an example of a state machine and the expected serialization output?

The other site is also Python? I ask to know if pickle is a possible solution, as pickle can serialize/deserialize Python objects, but in binary format.

I'm asking about the format, because maybe this can speak with this other thread related to "creating state machines dynamically".

The format can be something related to a JSON version of SCXML, as I'm playing here.

scls19fr commented 6 months ago

The current situation is Python on both side but it may change (different Python version ...) so I'd prefered to avoid Pickle.

My SM is currently quite simple and looks like

from statemachine import StateMachine, State
import datetime

def now():
    return datetime.datetime.utcnow().timestamp()

class DrivingState(StateMachine):
    "Finite state machine for driving state"

    init = State(initial=True)
    lidar_init = State()
    waiting_start = State()
    manual_mode = State()
    ok = State()
    warning = State()
    front_problem_situation = State()
    #rear_problem_situation = State()

    telemetry_ready = init.to(lidar_init)
    lidar_ready = lidar_init.to(waiting_start)
    caution = (ok.to(warning) | warning.to(warning))
    letsgo = (warning.to(ok))
    front_problem_occurs = (ok.to(front_problem_situation) | warning.to(front_problem_situation))
    front_problem_disappears = (front_problem_situation.to(warning))

    stop = (waiting_start.to(waiting_start) | ok.to(waiting_start) | warning.to(waiting_start) | front_problem_situation.to(waiting_start) | manual_mode.to(waiting_start))
    auto = (waiting_start.to(ok) | ok.to(ok) | warning.to(warning) | front_problem_situation.to(front_problem_situation))
    manual = ( waiting_start.to(manual_mode) )

    timestamp_state_changed = now()

    def __init__(self):
        super().__init__()
        self.telemetry = None  # will be set after attach

    def _before(self, event: str, source: State, target: State, message: str = ""):
        message = ". " + message if message else ""
        return f"{datetime.datetime.utcnow()} Running {event} from {source.id} to {target.id}{message}"        

    def before_caution(self, event: str, source: State, target: State, message: str = ""):
        self._before(str, source, target, message)

    def before_letsgo(self, event: str, source: State, target: State, message: str = ""):
        self._before(str, source, target, message)

    def before_front_problem_occurs(self, event: str, source: State, target: State, message: str = ""):
        self._before(str, source, target, message)

    def before_front_problem_disappears(self, event: str, source: State, target: State, message: str = ""):
        self._before(str, source, target, message)

    def _on_enter(self):
        self.timestamp_state_changed = now()
        print(f"{datetime.datetime.utcnow()} Entering '{self.current_state.value}'")
        self.telemetry.publish("autodriver/state", self.current_state.value)  # ToFix

    def on_enter_init(self):
        print(f"{datetime.datetime.utcnow()} Entering '{self.current_state.value}'")
        # telemetry is not yet configured

    def on_enter_waiting_start(self):
        self._on_enter()

    def on_enter_manual_mode(self):
        self._on_enter()

    def on_enter_ok(self):
        self._on_enter()

    def on_enter_warning(self):
        self._on_enter()

    def on_enter_front_problem_situation(self):
        self._on_enter()

    def _on_exit(self):
        duration = now() - self.timestamp_state_changed
        msg = f"Exiting '{self.current_state.value}' after {duration:.1f}s"
        print(f"{datetime.datetime.utcnow()} {msg}")
        self.telemetry.publish("logs", msg)

    def on_exit_front_problem_situation(self):
        self._on_exit()
        # print(f"{datetime.datetime.utcnow()} No more front problem")

auto_driver_state_machine

fgmacedo commented 4 months ago

Hi @scls19fr, following our discussion at this link, I believe we can close the issue for now. Please feel free to reopen it or create a new one if needed.