fgmacedo / python-statemachine

Python Finite State Machines made easy.
MIT License
854 stars 84 forks source link

Question: How can I use cond functions to gatekeep, but not fail? #422

Closed buelowp closed 5 months ago

buelowp commented 6 months ago

I have a very complicated home built FSM I wrote in python a few years ago to control lights in my house from an RPi. I want to migrate the code to a real FSM so I can clean it up, simplify, and fix a bunch of silly bugs I am working around. What I want to do is to create a circular FSM that can be triggered to change based on the conditions below and to silently ignore if the condition to allow a state transition isn't met. That way, I can just merrily check once a second if it's time to do something and always know I'm in the right state.

My expectation was that cond=function would simply not allow a transition if it evaluates to false. But on false, it throws an exception and even if I call pass in the exception handler, it never works right. I never see it move to the next state when cond=True once the exception handler fires the first time. If I start this after sunset, it works for a single transition, but not again because the next iteration causes the exception.

import time
from datetime import datetime
from suntime import Sun, SunTimeException
import pytz
from statemachine import StateMachine, State
from statemachine.contrib.diagram import DotGraphMachine
import logging
import logging.handlers
import sys

latitude = 42
longitude = -88

sun = Sun(latitude, longitude)

def setup_custom_logger(name):
    formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s',
                                  datefmt='%Y-%m-%d %H:%M:%S')
    handler = logging.FileHandler('uptime-log.txt', mode='a')
    handler.setFormatter(formatter)
    screen_handler = logging.StreamHandler(stream=sys.stdout)
    screen_handler.setFormatter(formatter)
    logger = logging.getLogger(name)
    logger.setLevel(logging.DEBUG)
    logger.addHandler(handler)
    logger.addHandler(screen_handler)
    return logger

conlog = setup_custom_logger("lights_sm")

class LightControlMachine(StateMachine):
    off = State(initial=True)
    evening = State()
    overnight = State()

    cycle = (
        off.to(evening, cond="sunset") |
        evening.to(overnight, cond="midnight") |
        overnight.to(off, cond="sunrise")
    )

    def on_enter_off(self):
        conlog.info("Entering off state")

    def on_enter_evening(self):
        conlog.info("Entering evening state")

    def on_enter_overnight(self):
        conlog.info("Entering overnight state")

    def sunset(self):
        ss = sun.get_local_sunset_time()
        now = datetime.now(pytz.timezone('US/Central'))
        conlog.info("After sunset is {}".format(now >= ss))
        return now >= ss

    def sunrise(self):
        sr = sun.get_local_sunrise_time()
        now = datetime.now(pytz.timezone('US/Central'))
        conlog.info("After sunrise is {}".format(now >= sr))
        return (now >= sr)

    def midnight(self):
        conlog.info("Is midnight is {}".format((time.time() % 86400) == 0))
        sr = sun.get_local_sunrise_time()
        now = datetime.now(pytz.timezone('US/Central'))
        return (now < sr)

def run_program():
    conlog.info("In thread")

    while True:
        time.sleep(100)

def main():
    sm = LightControlMachine()
#    graph = DotGraphMachine(sm)
#    graph().write_png("lcm.png")

    while True:
        sm.cycle()

        time.sleep(1)

if __name__ == '__main__':
    main()

What's the right way to do this with this FSM implementation?

This is the error I'm getting.

pi@officedisplay:~/Projects/pyxmaslights $ python lights_sm.py 
2024-03-15 10:36:18 INFO     Entering off state
2024-03-15 10:36:18 INFO     After sunset is False
Traceback (most recent call last):
  File "/home/pi/Projects/pyxmaslights/lights_sm.py", line 86, in <module>
    main()
  File "/home/pi/Projects/pyxmaslights/lights_sm.py", line 81, in main
    sm.cycle()
  File "/usr/local/lib/python3.9/dist-packages/statemachine/event.py", line 54, in trigger_event
    return event_instance.trigger(self, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/statemachine/event.py", line 29, in trigger
    return machine._process(trigger_wrapper)
  File "/usr/local/lib/python3.9/dist-packages/statemachine/statemachine.py", line 288, in _process
    return self._processing_loop()
  File "/usr/local/lib/python3.9/dist-packages/statemachine/statemachine.py", line 303, in _processing_loop
    result = trigger()
  File "/usr/local/lib/python3.9/dist-packages/statemachine/event.py", line 27, in trigger_wrapper
    return self._trigger(trigger_data)
  File "/usr/local/lib/python3.9/dist-packages/statemachine/event.py", line 44, in _trigger
    raise TransitionNotAllowed(trigger_data.event, state)
statemachine.exceptions.TransitionNotAllowed: Can't cycle when in Off.

I

fgmacedo commented 6 months ago

Hi @buelowp , how are you?

I think the solution is simple, just instantiate your SM with allow_event_without_transition=True.

Ref: https://python-statemachine.readthedocs.io/en/latest/api.html#statemachine

buelowp commented 6 months ago

I'm good, thanks for the rapid response. I completely missed that in the docs. Thank you. A quick test, and it doesn't fail. I'll let it run until the next transition to make sure I get what I want.

fgmacedo commented 6 months ago

I think that this can be better documented as a feature, it only appears here at the "API" docs. Sorry about that.

Feel free to send suggestions on how to properly document this.

Best!

buelowp commented 6 months ago

I saw a few things I had some troubles with, so I'll send some notes later. Thanks for listening.

fgmacedo commented 5 months ago

Hi @buelowp, it looks like we've resolved the issue, so I'm going to close it for now. If you need to, please feel free to reopen this one or start a new issue. Best!