pytransitions / transitions

A lightweight, object-oriented finite state machine implementation in Python with many extensions
MIT License
5.48k stars 525 forks source link

AsyncMachine transitions aren't atomic #640

Open cybergrind opened 8 months ago

cybergrind commented 8 months ago

Thank you for the library!

Describe the bug

It looks like several parallel tasks are able to successfully trigger the same transition more than once. So it is possible to have multiple parallel tasks to run the same transition chain in parallel.

#!/usr/bin/env python3
import asyncio
import logging
from contextlib import suppress

from transitions.extensions.asyncio import AsyncMachine

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s')
log = logging.getLogger('trans')
_global = 0

async def before_b(*args):
    pass

async def on_exc(event):
    log.exception(f'During trans: {event}')
    raise event.error

machine = AsyncMachine(
    states=['A', 'B', 'C'],
    transitions=[
        {'trigger': 'go_b', 'source': 'A', 'dest': 'B', 'before': [before_b]},
        {'trigger': 'go_c', 'source': 'B', 'dest': 'C', 'before': [before_b]},
    ],
    initial='A',
    send_event=True,
    on_exception=on_exc,
)

async def async_package():
    global _global
    with suppress(Exception):
        await machine.go_b()
        await asyncio.sleep(0.01)
        await machine.go_c()
        await asyncio.sleep(0.01)
        _global += 1

async def arun():
    log.debug('before go_b')
    coros = [async_package() for _ in range(100)]
    await asyncio.wait(coros)
    log.info(f'{_global=}')
    assert _global == 1, f'_global must be 1 vs {_global}'

def main():
    asyncio.run(arun())

if __name__ == '__main__':
    main()

Expected behavior

If one task is able to trigger transitions others shouldn't be able to trigger the same transition again. Probably check of actual state and it's transition to the new could be in some critical section that would prevent multiple successful transition from the invalid state for transition.

aleneum commented 1 month ago

Hello @cybergrind,

thank you for providing this MRE. It's a good test case for parallel execution of triggers.

The possibility that new events may cancel ongoing events is actually a feature and not a bug (see this comment and issue for more info). If you want events to be processes sequentially, you can pass queued=True to the machine constructor.

I altered you code a bit since when you pass queued=True all transitions will return True because when you trigger a transition you cannot know whether it will be successful AFTER all queued transitions have been processed. Furthermore, I added ignore_invalid_triggers=True to suppress raised exceptions. I rely on after callbacks to increase _global since those callbacks will only be executed if a transition is successful. The code below will increase _global twice: once for go_b and once for go_c. This means both transitions were successful only once which is probably what you were looking for.

#!/usr/bin/env python3
import asyncio

from transitions.extensions.asyncio import AsyncMachine

_global = 0

async def before_b(event_data):
    pass

def inc_global(event_data):
    global _global
    _global += 1

machine = AsyncMachine(
    states=['A', 'B', 'C'],
    transitions=[
        {'trigger': 'go_b', 'source': 'A', 'dest': 'B', 'before': [before_b], 'after': [inc_global]},
        {'trigger': 'go_c', 'source': 'B', 'dest': 'C', 'before': [before_b], 'after': [inc_global]},
    ],
    initial='A',
    send_event=True,
    ignore_invalid_triggers=True,
    queued=True,
)

async def async_package():
    await machine.go_b()
    await asyncio.sleep(0.01)
    await machine.go_c()

async def arun():
    coros = [async_package() for _ in range(100)]
    await asyncio.wait(coros)
    print(_global)
    assert _global == 2, f'_global must be 2 vs {_global}'

def main():
    asyncio.run(arun())

if __name__ == '__main__':
    main()
aleneum commented 1 month ago

I will close this for now since this is probably not relevant for you any longer. If this issue still concerns you feel free to comment and I will reopen the issue again.

cybergrind commented 1 month ago

Usage of before and after gave us super unclean code in the end with a moderately complex FSM that was hard to debug and support for small team (it was ok when there was only one person, but failed when we added more people).

With current async design we spent probably 1 or 2 weeks to pinpoint the issue (there were a lot of thing in our code to be honest) so if you will need a reason to change the default behavior there is one more in favor 😁

We have switched to the synchronous version in our case.

Thank you for a great library

aleneum commented 1 month ago

Hello @cybergrind,

thank you for taking the time to respond. I am sorry to hear that you had so much trouble with transitions.

I guess the Readme must make this behaviour more obvious and should provide ways to process events without racing conditions. There would be queued=True as one approach but also using a lock could work. If transitions might trigger other transitions, this could end up in a dead lock if a task tries to acquire the lock it already holds. I haven't tested this in depth but maybe relying on an unset context in a new task is good enough for a LockedAsyncEvent:

#!/usr/bin/env python3
import asyncio

from transitions import MachineError
from transitions.extensions.asyncio import AsyncMachine, AsyncEvent

class Model:

    def __init__(self):
        self.enter_b_counter = 0
        self.enter_c_counter = 0
        self.errors = 0

    async def on_enter_B(self):
        self.enter_b_counter += 1
        # deadlock test
        await self.to_D()

    async def on_enter_C(self):
        self.enter_c_counter += 1

async def check():
    await asyncio.sleep(0.1)
    return True

class LockedAsyncEvent(AsyncEvent):
    lock = asyncio.Lock()

    async def trigger(self, model, *args, **kwargs):
        if self.machine.current_context.get() is not None:
            return await super(LockedAsyncEvent, self).trigger(model, *args, **kwargs)
        else:
            # without the previous check this could cause deadlocks when callbacks trigger further transitions
            async with self.lock:  
                return await super(LockedAsyncEvent, self).trigger(model, *args, **kwargs)

AsyncMachine.event_cls = LockedAsyncEvent

model = Model()
machine = AsyncMachine(
    model=model,
    states=['A', 'B', 'C', 'D'],
    transitions=[
        {'trigger': 'go_b', 'source': 'A', 'dest': 'B', 'conditions': [check]},
        {'trigger': 'go_c', 'source': 'D', 'dest': 'C'},
    ],
    initial='A',
)

async def async_package():
    try:
        await model.go_b()
        await asyncio.sleep(1)
        await model.go_c()
    except MachineError:
        model.errors += 1

async def arun():
    await asyncio.gather(*[async_package() for _ in range(100)])
    assert model.enter_b_counter == 1, f"Counter B was {model.enter_b_counter}"
    assert model.enter_c_counter == 1, f"Counter C was {model.enter_c_counter}"
    assert model.errors == 99, f"Error count was {model.errors}"

asyncio.run(arun())

Furthermore, if before and after is not the right place for callbacks there is also on_enter_<state> and on_exit_<state> that do not need to be defined on the model but can be passed to state configs. Instead of states=['A'] one could write states=[{'name': 'A', 'on_enter': my_callback}]