pytransitions / transitions

A lightweight, object-oriented finite state machine implementation in Python with many extensions
MIT License
5.68k stars 530 forks source link

AsyncMachine cancel tasks that it should not #465

Closed AxelVoitier closed 4 years ago

AxelVoitier commented 4 years ago

Hello,

I faced a stange issue where I observed triggering a transition from a different asyncio task than my main one was making the main task exit on a asyncio.CancelledError.

I managed to isolate and reproduce the issue in the following code:

import asyncio
import logging
import time
from threading import Thread

from transitions.extensions.asyncio import AsyncMachine

logging.basicConfig(level=logging.DEBUG)
# logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)

class MyMachine(AsyncMachine):

    STATES = [
        'A',
        'B',
        'C',
        'D',
    ]

    TRANSITIONS = [
        dict(
            trigger='A_to_B',
            source='A',
            before='do_something',
            dest='B',
        ),
        dict(
            trigger='B_to_C',
            source='B',
            before='do_something',
            dest='C',
        ),
        dict(
            trigger='C_to_D',
            source='C',
            before='do_something',
            dest='D',
        ),
        dict(
            trigger='reset',
            source='*',
            before='do_something',
            dest='A',
        ),
    ]

    def __init__(self):
        super().__init__(
            states=self.STATES, transitions=self.TRANSITIONS,
            initial='A', auto_transitions=False,
        )

    async def do_something(self):
        _logger.info('Do something from state %s', self.state)
        await asyncio.sleep(0.2)
        _logger.info('Do something from state %s finished', self.state)

machine = MyMachine()
loop = None

async def aio_main():
    global loop
    loop = asyncio.get_event_loop()

    try:
        while True:
            if machine.state == 'A':
                await machine.A_to_B()
                continue

            if machine.state == 'C':
                await machine.C_to_D()
                continue

            await asyncio.sleep(0.1)

    except asyncio.CancelledError:
        _logger.info('Got cancelled')
        await machine.reset()

def external_trigger():
    time.sleep(1)

    async def call():
        _logger.info('Calling B_to_C')
        result = await machine.B_to_C()
        _logger.info('Trigger done (%s), state is %s', result, machine.state)
        assert result
        assert machine.state == 'C'

    fut = asyncio.run_coroutine_threadsafe(call(), loop)
    fut.result()

if __name__ == '__main__':
    external_trigger_thread = Thread(target=external_trigger)
    external_trigger_thread.start()
    asyncio.run(aio_main(), debug=True)

The output I obtain is the following:

DEBUG:asyncio:Using selector: EpollSelector
DEBUG:transitions.extensions.asyncio:Executed machine preparation callbacks before conditions.
DEBUG:transitions.extensions.asyncio:Initiating transition from state A to state B...
DEBUG:transitions.extensions.asyncio:Executed callbacks before conditions.
INFO:__main__:Do something from state A
INFO:__main__:Do something from state A finished
DEBUG:transitions.extensions.asyncio:Executed callback before transition.
DEBUG:transitions.extensions.asyncio:Exiting state A. Processing callbacks...
INFO:transitions.extensions.asyncio:Exited state A
DEBUG:transitions.extensions.asyncio:Entering state B. Processing callbacks...
INFO:transitions.extensions.asyncio:Entered state B
DEBUG:transitions.extensions.asyncio:Executed callback after transition.
DEBUG:transitions.extensions.asyncio:Executed machine finalize callbacks
INFO:__main__:Calling B_to_C
DEBUG:transitions.extensions.asyncio:Executed machine preparation callbacks before conditions.
DEBUG:transitions.extensions.asyncio:Initiating transition from state B to state C...
DEBUG:transitions.extensions.asyncio:Executed callbacks before conditions.
DEBUG:transitions.extensions.asyncio:Cancel running tasks...
INFO:__main__:Got cancelled
DEBUG:transitions.extensions.asyncio:Executed machine preparation callbacks before conditions.
DEBUG:transitions.extensions.asyncio:Initiating transition from state B to state A...
DEBUG:transitions.extensions.asyncio:Executed callbacks before conditions.
DEBUG:transitions.extensions.asyncio:Cancel running tasks...
INFO:__main__:Do something from state B
DEBUG:transitions.extensions.asyncio:Executed machine finalize callbacks
INFO:__main__:Trigger done (False), state is B
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/home/lexa/.pyenv/versions/3.8.1/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/home/lexa/.pyenv/versions/3.8.1/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "try_transitions.py", line 98, in external_trigger
    fut.result()
  File "/home/lexa/.pyenv/versions/3.8.1/lib/python3.8/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/home/lexa/.pyenv/versions/3.8.1/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
    raise self._exception
  File "try_transitions.py", line 94, in call
    assert result
AssertionError
INFO:__main__:Do something from state B finished
DEBUG:transitions.extensions.asyncio:Executed callback before transition.
DEBUG:transitions.extensions.asyncio:Exiting state B. Processing callbacks...
INFO:transitions.extensions.asyncio:Exited state B
DEBUG:transitions.extensions.asyncio:Entering state A. Processing callbacks...
INFO:transitions.extensions.asyncio:Entered state A
DEBUG:transitions.extensions.asyncio:Executed callback after transition.
DEBUG:transitions.extensions.asyncio:Executed machine finalize callbacks
DEBUG:asyncio:Close <_UnixSelectorEventLoop running=False closed=False debug=True>

To me, this behavour seems to come from AsyncMachine.switch_model_context() https://github.com/pytransitions/transitions/blob/251aafef48b214f04bb9fbdd434dbffaf3fb2b52/transitions/extensions/asyncio.py#L363

When executing the B_to_C trigger, it sees the contexvar is already set to the main task (or any task that would have executed a trigger beforhand) and not itself, and so proceed to cancel it.

I would have expected that after a transition complete, this contextvar gets reinitialised such that if a long running task is not currently running a trigger, then any other task could use the machine (or model).

Cheers, Axel

aleneum commented 4 years ago

I would have expected that after a transition complete, this contextvar gets reinitialised such that if a long running task is not currently running a trigger, then any other task could use the machine (or model).

it used to be like this and this is the reason AsyncMachine.process_context has model as an unused parameter just in case:

class ResetContextMachine(AsyncMachine):

    async def process_context(self, func, model):
        if self.current_context.get() is None:
            self.current_context.set(asyncio.current_task())
            try:
                res = await self._process(func)
                if self.async_tasks[model] == asyncio.current_task():
                    del self.async_tasks[model]
                return res
            except asyncio.CancelledError:
                return False
        return await self._process(func)

The idea to not make this standard behaviour is that is creates a false sense of security since it does not cancel the main task when there is nothing triggered at the moment but it's definitely possible for other tasks to 'snipe' the main task when events are triggered more or less at the same time. This is why we opted for consistent behaviour which is the last triggered event cancels all previous events. B_to_C will cancel the main task and thus trigger reset which again cancels the task running B_to_C. I guess it could be nice to have a convenient built-in way to optionally protect/shield certain tasks from being cancelled to customize AsyncMachine. Right now you can do this via overriding process_context and/or switch_model_context.

aleneum commented 4 years ago

Closing this since there hasn't been feedback for almost two weeks. Feel free to comment if you consider this still an issue. I will reopen the issue if necessary.

AxelVoitier commented 4 years ago

Ah yes, sorry.

Your response was helpful. However, when there was not one but two triggers from the main task before the "external" trigger, it was not working as the first if statement is evaluating to False on the second call from main task, and therefore the del wasn't happening anymore.

I modified it in this way:

    async def process_context(self, func, model):
        if self.current_context.get() is None:
            self.current_context.set(asyncio.current_task())
            try:
                res = await self._process(func)
                if self.async_tasks[model] == asyncio.current_task():
                    del self.async_tasks[model]
                return res
            except asyncio.CancelledError:
                return False
        else:
            res = await self._process(func)
            if self.async_tasks[model] == asyncio.current_task():
                del self.async_tasks[model]
            return res

So far it works and I have been developing around this machine without more issues from this.

Beside that, the subject of resource protection in concurrency environment is a wild topic :). I would just observe here there is two inconsistent behaviours between the LockedMachine using the lock approach for a multithreaded machine, and here the AsyncMachine taking the cancel approach for a "multitasked" machine.

Cheers, Axel

AxelVoitier commented 4 years ago

Hi again,

I found another issue with this. It was amplified by a rather "advanced" usage where I use two different state machines that can call each other from within their callbacks (and also still using "external/threaded" triggers + main task executing periodic or reactive actions over its long life span).

But it was actually not noticable until the conditions in one of my test where such that it ended up cancelling the main task again.

The issue is that if _self.currentcontext.get() is not None, then it just assumes it corresponds to the current task, which is not always true. Especially since, as it is a class attribute, it is shared by all different subclasses of AsyncMachine. And also not helped by the fact that all the callbacks are running in their own tasks as well.

This leds it afterwards to not execute the _del self.asynctasks[model] as the if condition fencing it is not matching, since its value does not correspond to the real task that called the trigger.

The fix is to replace the first line of this overloaded _processcontext() with:

if self.current_context.get() != asyncio.current_task():

Can you confirm that is the right analysis and the right way to fix it?

Also, I can testify debugging this was quite an ordeal. It is hard to understand what are the intended roles of _currentcontext and _asynctasks. I also wonder if the fact that current_context is shared by all state machines is not going to introduce additional weird behaviours?

Cheers, Axel

aleneum commented 4 years ago

Hello @AxelVoitier,

Can you confirm that is the right analysis and the right way to fix it?

unfortuntaley, it's not because it leads to the cancellation of tasks running in the same context that migh have triggered the current task. This may lead to a task cancelling its parent and consequentually itself in switch_model_context. pytest will complain (as in test_async_timeout will fail) when you do this change.

It is hard to understand what are the intended roles of current_context and async_tasks

The ideas is to not kill your parent. Unfortunately (or maybe not) there is no hierarchy of tasks. In my oppinion, context should be shared across machines since this is a 'model-centric' attribute and one callback on one machine could trigger a callback on another machine (via the model) and kill the whole process. current_context should be considered a module variable. The only reason why it is tied to AsyncMachine is that it is not used elsewhere.

aleneum commented 4 years ago

...

But it was actually not noticable until the conditions in one of my test where such that it ended up cancelling the main task again.

and it seems its not working well. If you can boil down your use case to something testable, I'd evaluate whether a list of context tasks can avoid this.