pytransitions / transitions

A lightweight, object-oriented finite state machine implementation in Python with many extensions
MIT License
5.48k stars 525 forks source link

AsyncMachine with queued transitions breaks when transition is cancelled #639

Closed ofacklam closed 1 month ago

ofacklam commented 8 months ago

Hello,

First off, thanks for the very helpful library :)

I seem to have found a bug specifically relating to AsyncMachines with queued transitions enabled.

Describe the bug When using an AsyncMachine with queued transitions, in case a transition is cancelled, the state machine stops processing triggers completely.

This seems to be related to the fact that if the processing of an enqueued transition raises a BaseException (in particular asyncio.CancelledError) here, the queue is not cleared properly. Any later transitions are then only enqueued, but processing will never be resumed.

I don't know if this issue affects other queued implementations, or only the AsyncMachine one.

Minimal working example

import asyncio
import logging

from transitions.extensions.asyncio import AsyncMachine

logging.basicConfig(level=logging.DEBUG)
# logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)

class MyMachine(AsyncMachine):
    STATES = [
        'A',
        'B',
        'C',
    ]

    TRANSITIONS = [
        dict(
            trigger='A_to_B',
            source='A',
            after='do_something',
            dest='B',
        ),
        dict(
            trigger='B_to_C',
            source='B',
            after='do_something',
            dest='C',
        ),
        dict(
            trigger='reset',
            source='*',
            after='do_something',
            dest='A',
        ),
    ]

    def __init__(self):
        super().__init__(
            states=self.STATES,
            transitions=self.TRANSITIONS,
            initial='A',
            auto_transitions=False,
            queued=True,  # <== this triggers the bug
        )

    async def do_something(self):
        _logger.info('Do something from state %s', self.state)
        await asyncio.sleep(1)
        _logger.info('Do something from state %s finished', self.state)

machine = MyMachine()
loop = None

async def aio_main():
    global loop
    loop = asyncio.get_event_loop()

    try:
        a_to_b = asyncio.create_task(machine.A_to_B())
        await asyncio.sleep(0.5)
        a_to_b.cancel()

        await machine.B_to_C()  # Does not do anything
        await asyncio.sleep(2)
        await machine.reset()  # Does not do anything

        # Expected result
        assert machine.state == "A"

    except asyncio.CancelledError:
        _logger.info('Got cancelled')

if __name__ == '__main__':
    asyncio.run(aio_main(), debug=True)

Expected behavior The state machine should continue processing future triggers.