pytransitions / transitions

A lightweight, object-oriented finite state machine implementation in Python with many extensions
MIT License
5.5k stars 525 forks source link

Trying to implement AsyncTimeoutState, RecursionError #429

Closed potens1 closed 4 years ago

potens1 commented 4 years ago

Hello, I'm trying to implement an AsyncTimeoutState extension to better interact with asyncio instead of having threads coming into play. I'm hitting the case where, when the timeout is cancelled and awaited after the timeout is done, I get RecusionError, and I don't get where it's coming from. Here is what I've unit now:

import asyncio

import pytest
from transitions.core import listify
from transitions.extensions.asyncio import AsyncMachine, AsyncState
from transitions.extensions.states import add_state_features

class Timer:
    def __init__(self, timeout, callback, args):
        self._timeout = timeout
        self._callback = callback
        self._task = asyncio.create_task(self._job())
        self._args = args
        self.is_alive = True
        self._timed_out = False

    async def _job(self):
        await asyncio.sleep(self._timeout)
        await self._callback(*self._args)

    async def cancel(self):
        self.is_alive = False
        if self._timed_out:
            self._task.cancel()
        try:
            await self._task  # <---- Here is the problem
        except asyncio.CancelledError:
            pass

class AsyncTimeout(AsyncState):
    """
    Adds timeout functionality to an state. Timeouts are handled model-specific.

    Attributes:
        timeout (float): Seconds after which a timeout function should be
                         called.
        on_timeout (list): Functions to call when a timeout is triggered.
    """

    dynamic_methods = ["on_timeout"]

    def __init__(self, *args, **kwargs):
        """
        Args:
            **kwargs: If kwargs contain 'timeout', assign the float value to
                self.timeout. If timeout is set, 'on_timeout' needs to be
                passed with kwargs as well or an AttributeError will be thrown
                if timeout is not passed or equal 0.
        """
        self.timeout = kwargs.pop("timeout", 0)
        self._on_timeout = None
        if self.timeout > 0:
            try:
                self.on_timeout = kwargs.pop("on_timeout")
            except KeyError:
                raise AttributeError(
                    "Timeout state requires 'on_timeout' when timeout is set."
                )
        else:
            self._on_timeout = kwargs.pop("on_timeout", [])
        self.runner = {}
        super().__init__(*args, **kwargs)

    async def enter(self, event_data):
        """
        Extends `transitions.core.State.enter` by starting a timeout timer for
        the current model when the state is entered and self.timeout is larger
        than 0.
        """
        if self.timeout > 0:
            timer = Timer(
                self.timeout, self._process_timeout, args=(event_data,)
            )
            self.runner[id(event_data.model)] = timer
        await super().enter(event_data)

    async def exit(self, event_data):
        timer_task = self.runner.get(id(event_data.model), None)
        if timer_task is not None and timer_task.is_alive:
            await timer_task.cancel()
        await super().exit(event_data)

    async def start_timeout(self, delay, event_data):
        await asyncio.sleep(delay)
        await self._process_timeout(event_data)

    async def _process_timeout(self, event_data):
        await event_data.model.go()
        await asyncio.sleep(0.1)

    @property
    def on_timeout(self):
        """
        List of strings and callables to be called when the state timeouts.
        """
        return self._on_timeout

    @on_timeout.setter
    def on_timeout(self, value):
        """ Listifies passed values and assigns them to on_timeout."""
        self._on_timeout = listify(value)

@add_state_features(AsyncTimeout)
class CustomStateMachine(AsyncMachine):
    pass

class SocialSuperhero(object):
    def __init__(self):
        self.entourage = 0

    def on_enter_waiting(self, ev):
        self.entourage += 1

    async def fin(self, ev):
        print("Done !")

@pytest.mark.asyncio
async def test_async_state():
    states = [
        {"name": "preparing"},
        {"name": "waiting", "timeout": 1, "on_timeout": "go"},
        {"name": "away"},
    ]  # The city needs us!

    transitions = [
        ["done", "preparing", "waiting"],
        [
            "join",
            "waiting",
            "waiting",
        ],  # Entering Waiting again will increase our entourage
        ["go", "waiting", "away"],
    ]  # Okay, let' move

    hero = SocialSuperhero()
    machine = CustomStateMachine(
        model=hero, states=states, transitions=transitions, initial="preparing",
        finalize_event="fin", send_event=True
    )
    assert hero.state == "preparing"  # Preparing for the night shift
    await hero.done()
    assert hero.state == "waiting"  # Waiting for fellow superheroes to join us
    assert hero.entourage == 1  # It's just us so far
    await asyncio.sleep(0.2)  # Waiting...
    await hero.join()  # Weeh, we got company
    await asyncio.sleep(0.3)  # Waiting...
    await hero.join()  # Even more company \o/
    await asyncio.sleep(2)  # Waiting...
    assert (
        hero.state == "away"
    )  # Impatient superhero already left the building

if __name__ == "__main__":
    import logging

    logging.basicConfig(level=logging.DEBUG)
    asyncio.run(test_async_state(), debug=True)

If I don't await the Timer._task after the cancel, everything seems okay, but, if I'm not wrong, I fear the asyncio.Task will never be garbage collected (not sure of that).

Does someone has any idea/view/enlightening about that ?

The error is:

ERROR    asyncio:base_events.py:1619 Exception in callback <TaskStepMethWrapper object at 0x7fc5f402ea10>()
handle: <Handle <TaskStepMethWrapper object at 0x7fc5f402ea10>() created at /usr/lib/python3.7/asyncio/base_events.py:404>
source_traceback: Object created at (most recent call last):
  File "/usr/lib/python3.7/asyncio/base_events.py", line 541, in run_forever
    self._run_once()
  File "/usr/lib/python3.7/asyncio/base_events.py", line 1778, in _run_once
    handle._run()
  File "/usr/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/home/nic/development/states/tests/test_asyncstate.py", line 19, in _job
    await self._callback(*self._args)
  File "/home/nic/development/states/tests/test_asyncstate.py", line 88, in _process_timeout
    await event_data.model.go()
  File "/home/nic/.virtualenvs/states-T205R_h3/lib/python3.7/site-packages/transitions/extensions/asyncio.py", line 188, in trigger
    t = asyncio.create_task(self.machine._process(func))
  File "/usr/lib/python3.7/asyncio/tasks.py", line 351, in create_task
    return loop.create_task(coro)
  File "/usr/lib/python3.7/asyncio/base_events.py", line 404, in create_task
    task = tasks.Task(coro, loop=self)
Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
RecursionError: maximum recursion depth exceeded while calling a Python object

EDIT: Added if to check if the Timer._task should be canceled or not, that should be done, but, it does not change the problem

aleneum commented 4 years ago

Hi @potens1,

I answered a similar question on Stackoverflow and suggested the use of wait_for (source). Does this help? This way you shouldnt be required to cancel tasks. edit: okay, wait_for is only useful when entering a state is stalling but not does not help when a state is kept too long. I guess in that case it is necessary to keep track of a timeout task.

aleneum commented 4 years ago

This seems to work:

import asyncio
import pytest
import logging

from transitions.core import listify
from transitions.extensions.asyncio import AsyncMachine, AsyncState
from transitions.extensions.states import add_state_features

_LOGGER = logging.getLogger(__name__)
_LOGGER.addHandler(logging.NullHandler())

async def _timeout(timeout, func, event_data):
    try:
        await asyncio.sleep(timeout)
        await func(event_data)
    except asyncio.CancelledError:
        pass

class AsyncTimeout(AsyncState):
    """
    Adds timeout functionality to an state. Timeouts are handled model-specific.

    Attributes:
        timeout (float): Seconds after which a timeout function should be
                         called.
        on_timeout (list): Functions to call when a timeout is triggered.
    """

    dynamic_methods = ["on_timeout"]

    def __init__(self, *args, **kwargs):
        """
        Args:
            **kwargs: If kwargs contain 'timeout', assign the float value to
                self.timeout. If timeout is set, 'on_timeout' needs to be
                passed with kwargs as well or an AttributeError will be thrown
                if timeout is not passed or equal 0.
        """
        self.timeout = kwargs.pop("timeout", 0)
        self._on_timeout = None
        if self.timeout > 0:
            try:
                self.on_timeout = kwargs.pop("on_timeout")
            except KeyError:
                raise AttributeError(
                    "Timeout state requires 'on_timeout' when timeout is set."
                )
        else:
            self._on_timeout = kwargs.pop("on_timeout", [])
        self.runner = {}
        super().__init__(*args, **kwargs)

    async def enter(self, event_data):
        """
        Extends `transitions.core.State.enter` by starting a timeout timer for
        the current model when the state is entered and self.timeout is larger
        than 0.
        """
        if self.timeout > 0:
            self.runner[id(event_data.model)] = asyncio.ensure_future(_timeout(self.timeout, self._process_timeout, event_data))
        await super().enter(event_data)

    async def exit(self, event_data):
        timer_task = self.runner.get(id(event_data.model), None)
        if timer_task is not None and not timer_task.done():
            timer_task.cancel()
        await super().exit(event_data)

    async def _process_timeout(self, event_data):
        _LOGGER.debug("%sTimeout state %s. Processing callbacks...", event_data.machine.name, self.name)
        await event_data.machine.callbacks(self.on_timeout, event_data)
        _LOGGER.info("%sTimeout state %s processed.", event_data.machine.name, self.name)

    @property
    def on_timeout(self):
        """
        List of strings and callables to be called when the state timeouts.
        """
        return self._on_timeout

    @on_timeout.setter
    def on_timeout(self, value):
        """ Listifies passed values and assigns them to on_timeout."""
        self._on_timeout = listify(value)

@add_state_features(AsyncTimeout)
class CustomStateMachine(AsyncMachine):
    pass

class SocialSuperhero(object):
    def __init__(self):
        self.entourage = 0

    def on_enter_waiting(self, ev):
        self.entourage += 1

    async def fin(self, ev):
        print("Done !")

@pytest.mark.asyncio
async def test_async_state():
    states = [
        {"name": "preparing"},
        {"name": "waiting", "timeout": 1, "on_timeout": "go"},
        {"name": "away"},
    ]  # The city needs us!

    transitions = [
        ["done", "preparing", "waiting"],
        [
            "join",
            "waiting",
            "waiting",
        ],  # Entering Waiting again will increase our entourage
        ["go", "waiting", "away"],
    ]  # Okay, let' move

    hero = SocialSuperhero()
    machine = CustomStateMachine(
        model=hero, states=states, transitions=transitions, initial="preparing",
        finalize_event="fin", send_event=True
    )
    assert hero.state == "preparing"  # Preparing for the night shift
    await hero.done()
    assert hero.state == "waiting"  # Waiting for fellow superheroes to join us
    assert hero.entourage == 1  # It's just us so far
    await asyncio.sleep(0.2)  # Waiting...
    await hero.join()  # Weeh, we got company
    await asyncio.sleep(0.3)  # Waiting...
    await hero.join()  # Even more company \o/
    await asyncio.sleep(2)  # Waiting...
    assert (
        hero.state == "away"
    )  # Impatient superhero already left the building

if __name__ == "__main__":
    import logging

    logging.basicConfig(level=logging.DEBUG)
    asyncio.run(test_async_state(), debug=True)
aleneum commented 4 years ago

I added this to transitions.extensions.asyncio and maybe release it in 0.8.2 if there is no major flaw involved. The test runs okay:

import asyncio
from transitions.extensions.states import add_state_features
from transitions.extensions.asyncio import AsyncTimeout, AsyncMachine

@add_state_features(AsyncTimeout)
class TimeoutMachine(AsyncMachine):
    pass

states = ['A', {'name': 'B', 'timeout': 0.2, 'on_timeout': 'to_C'}, 'C']
m = TimeoutMachine(states=states, initial='A')
asyncio.run(asyncio.wait([m.to_B(), asyncio.sleep(0.1)]))
assert m.is_B()  # timeout shouldn't be triggered
asyncio.run(asyncio.wait([m.to_B(), asyncio.sleep(0.3)]))
assert m.is_C()   # now timeout should have been processed
aleneum commented 4 years ago

0.8.2 has been released and includes AsyncTimeout. I consider this solved. Feel free to comment if thats not the case and I will reopen the issue when necessary. If you experience an problem with AsyncTimeout, open a new issue please.