Open matt3o opened 4 months ago
Hello @matt3o,
this issue can be boiled down to this:
import asyncio
from transitions.extensions import AsyncMachine
from transitions.extensions.asyncio import AsyncTimeout
from transitions.extensions.states import add_state_features
@add_state_features(AsyncTimeout)
class MyMachine(AsyncMachine):
async def on_enter_B(self):
print("ENTERED")
await asyncio.sleep(2)
print("DONE")
async def handle_timeout(self):
print("CANCEL")
await self.to_A()
async def run():
m = MyMachine(states=['A', {"name": "B", "timeout": 1, "on_timeout": "handle_timeout"}], initial='A')
await m.to_B()
asyncio.run(run())
Obviously, self.to_A()
should cancel the execution of on_enter_B
. I need to check why this is not the case. I assume there are some false assumptions about the context of the tasks.
Without queuing one could transition away from the current state and cause a CancelledError
. The workaround is to prevent copying the current context with create_task(..., context=Context())
. Your adapted code:
from contextvars import Context
from transitions.extensions.asyncio import AsyncMachine, AsyncTimeout
import asyncio
from enum import Enum
from transitions.extensions.states import add_state_features, Tags
from transitions.extensions import GraphMachine
import logging
import time
logger = logging.getLogger("tmp")
logging.getLogger('transitions').setLevel(logging.INFO)
logging.getLogger('asyncio').setLevel(logging.WARNING)
class States(str, Enum):
start = "Start handling of request"
parse_input = "Parse Input"
finished = "Finished handling of request"
error = "Error"
states = [
States.start,
{"name": States.parse_input, 'timeout': 5, "on_timeout": "timeout_handler"},
{"name": States.finished, "final": True},
States.error
]
@add_state_features(Tags, AsyncTimeout)
class CustomGraphMachine(GraphMachine):
pass
@add_state_features(Tags, AsyncTimeout)
class CustomAsyncMachine(AsyncMachine):
pass
class ChatStateMachine:
def __init__(self):
self.machine = CustomAsyncMachine(
model=self,
states=states,
auto_transitions=True,
initial=States.start,
send_event=True,
on_exception='handle_error',
name="ChatStateMachine",
before_state_change=[self.default_on_exit],
after_state_change=[self.default_on_enter],
)
self.start = time.time()
async def default_on_enter(self, event):
# print(event)
state = event.state.name
print(f"Entering state: {state}")
# Fire and forget the async database tracking logic
asyncio.create_task(self.update_database(state))
# await asyncio.sleep(0.01)
async def default_on_exit(self, event):
# print(event)
state = event.state.name
print(f"Exiting state: {state}")
# Fire and forget the async database tracking logic
# asyncio.create_task(self.update_database(state))
async def update_database(self, state_name):
# Simulate async database update
await asyncio.sleep(0.1)
print(f"Database updated with state: {state_name}")
async def on_enter_parse_input(self, event):
print(f"on_enter_parse_input {time.time() - self.start:.2f} seconds")
await asyncio.sleep(20)
print(f"resuming after sleep {time.time() - self.start:.2f} seconds")
# await asyncio.wait_for(asyncio.sleep(30), 6)
async def handle_error(self, event):
print(f"Received error: {event.error=}")
# if not event.state.name == States.fatal_error:
# await self.to_fatal_error_raised()
# raise event.error
del event.error
async def timeout_handler(self, event):
print(f"timeout_handler() after {time.time() - self.start:.2f} seconds")
print("TIMEOUT")
await asyncio.create_task(self.to_error(), context=Context())
async def run(self):
await self.to_parse_input()
await asyncio.sleep(0.5) # let the update database task finish
await self.to_finished()
async def run_state_machine():
chat_sm = ChatStateMachine()
await chat_sm.run()
if __name__ == "__main__":
asyncio.run(run_state_machine())
This is just a workaround. A part of the fix will/could be that this 'context switch' will be handled by AsyncTimeout
.
I don't see an easy way to make raise TimeoutError
work in handle_timeout
since a timeout should not block a transition. It may happen after the transition has been successful (outside of the try/catch-block that forwards things to handle_error
) but while the model/machine is still in a time-critical state. With an empty context, the created timeout handler can, however, either transition away directly (when queued=False
) or explicitly force a context switch. I added this change to a branch as mentioned above. I hope this example illustrates how this could work.
import asyncio
from transitions.extensions import AsyncMachine
from transitions.extensions.asyncio import AsyncTimeout, AsyncEventData
from transitions.extensions.states import add_state_features
@add_state_features(AsyncTimeout)
class MyMachine(AsyncMachine):
async def on_enter_A(self, event_data: AsyncEventData):
print("Enter A...")
async def on_enter_B(self, event_data: AsyncEventData):
print("Enter B...")
await asyncio.sleep(event_data.kwargs.get("sleep", 0))
print("... Done in B.")
async def handle_timeout(self, event_data: AsyncEventData):
print("Timeout!")
if event_data.machine.has_queue:
await event_data.machine.switch_model_context(self)
await self.to_A()
async def handle_error(self, event_data):
print(f"Error: {event_data.error=}")
async def run():
m = MyMachine(states=['A', {"name": "B", "timeout": 1, "on_timeout": "handle_timeout"}],
transitions=[{"trigger": "try_something", "source": "*",
"dest": "A", "conditions": [lambda event_data: False]}],
initial='A', queued=True, send_event=True, on_exception="handle_error")
print("First round")
await m.to_B(sleep=2)
print("Second round")
await m.to_B(sleep=0.5)
while not m.is_A():
await m.try_something() # when send_event=True, this will always return True
print("Try something else...")
await asyncio.sleep(0.2)
asyncio.run(run())
Output:
First round
Enter B...
Timeout!
Error: event_data.error=CancelledError()
Enter A...
Second round
Enter B...
... Done in B.
Try something else...
Try something else...
Try something else...
Timeout!
Enter A...
If you have some feedback, let me know.
I updated the development branch with another approach which hopefully tackles most scenarios when errors are (re-)raised and keeps internals internal since it does not require to manually cancel ongoing transitions.
I am trying to wrap my head around all the possible outcomes for an async timeout. When a timeout raises an exception and handle_error
is not present OR raises an exception itself, a running transition should be halted and the transition queue -- if present -- should be cleared. Halting an ongoing transition will raise a CancelledError which means that handle_error
can be called twice but with a different error. If a CancelledError is 're-raised' it will be caught by transitions as CancelledError are expected when transition interrupt each other. However, handle_error
may also raise another error (e.g. RuntimeException). In this cause it will be forwarded to the event caller. In both cases, the event queue will be cleared.
This means an exception raised in a timeout cannot be 'forwarded' to the initial event caller. However, a CancelledException can be intercepted and -- if the cancellation has been caused by a timeout -- raised as another error (e.g. TimeoutException). How to determine wether the cancellation has been triggered by the timeout depends on the design of the state machine. A sequential order of TimeourError -> CancellationError
may not always be accurate. Another approach could be to check the CancellationError
error message. Transition will set this to either the name of the event that causes the current transition to be cancelled or to 'timeout' when the cancellation was caused by one.
flowchart TD
A[TimeoutState] --[timeout]--> B
B[on_timeout] --> L["trigger event?"]
L --"yes & queue"--> X
L --"yes & !queue"--> D
L --"no"--> X[continue...]
B[on_timeout] --[error]--> C
C["on_exception(TimeoutError)"] --[error]--> D
C --> L
D[cancel_running_transitions] --> E
E[running tasks?] --yes--> G
E --"no"--> X
G["on_exception(CancelledError)"] --> X
G --error--> I
I --"other"--> M["raised to event caller"]
I[clear_queue] --"CancelledError"--> X
import asyncio
from asyncio import CancelledError
from transitions.extensions import AsyncMachine
from transitions.extensions.asyncio import AsyncTimeout, AsyncEventData
from transitions.extensions.states import add_state_features
@add_state_features(AsyncTimeout)
class MyMachine(AsyncMachine):
async def on_enter_A(self, event_data: AsyncEventData):
print("Enter A...")
async def on_enter_B(self, event_data: AsyncEventData):
self.timeout_called = False
await self.to_C() # this will not be called when B is blocking
seconds = event_data.kwargs.get("sleep", 100)
await asyncio.sleep(seconds)
async def on_enter_C(self, event_data: AsyncEventData):
print("We are in C now. B was (hopefully!) non-blocking")
async def handle_timeout(self, event_data: AsyncEventData):
print("Trigger Timeout!")
self.timeout_called = True
raise TimeoutError()
async def handle_error(self, event_data):
print(f"Handle Error: {event_data.error=}")
if isinstance(event_data.error, CancelledError) and self.timeout_called:
print("CancelledError after a timeout!")
raise TimeoutError()
raise event_data.err
async def run():
m = MyMachine(states=['A', {"name": "B", "timeout": 1, "on_timeout": "handle_timeout"},
{"name": "C", "timeout": 1, "on_timeout": "handle_timeout"}],
transitions=[{"trigger": "try_something", "source": "*",
"dest": "A", "conditions": [lambda event_data: False]}],
initial='A', queued=True, send_event=True, on_exception="handle_error")
print("# Scenario A: No queue and blocking to_B...")
try:
await m.to_B()
except TimeoutError:
print("Caught a timeout without queue.")
print("# Scenario B: No queue and non-blocking to_B...")
await m.to_B(sleep=0.1)
await asyncio.sleep(2)
m = MyMachine(states=['A', {"name": "B", "timeout": 1, "on_timeout": "handle_timeout"},
{"name": "C", "timeout": 1, "on_timeout": "handle_timeout"}],
transitions=[{"trigger": "try_something", "source": "*",
"dest": "A", "conditions": [lambda event_data: False]}],
initial='A', queued=True, send_event=True, on_exception="handle_error")
print("# Scenario C: queue and blocking to_B...")
try:
await m.to_B()
except TimeoutError:
print("Caught a timeout without queue.")
print("# Scenario D: queue and non-blocking to_B...")
await m.to_B(sleep=0.1)
await asyncio.sleep(2)
asyncio.run(run())
# Scenario A: No queue and blocking to_B...
Trigger Timeout!
Handle Error: event_data.error=TimeoutError()
Handle Error: event_data.error=CancelledError('timeout')
CancelledError after a timeout!
Caught a timeout without queue.
# Scenario B: No queue and non-blocking to_B...
We are in C now. B was (hopefully!) non-blocking
Trigger Timeout!
Handle Error: event_data.error=TimeoutError()
# Scenario C: queue and blocking to_B...
Trigger Timeout!
Handle Error: event_data.error=TimeoutError()
Handle Error: event_data.error=CancelledError('timeout')
CancelledError after a timeout!
Caught a timeout without queue.
# Scenario D: queue and non-blocking to_B...
We are in C now. B was (hopefully!) non-blocking
Trigger Timeout!
Handle Error: event_data.error=TimeoutError()
Sorry for the super late response @aleneum. I'll try it this week or next week and come back to you. Also many thanks for the quick response, I lost track of it before my holidays ❤️
Describe the bug What did I want: The timeout shall immediately stop the execution of the currently running state. I have e.g. blocking database calls which shall just end in that case. That is at least what I expected that the timeout would do.
However in the default implementation, the timeout does not interrupt the execution of the state. By now, I know the timeout runs in a separate thread. I tried to raise a TimeoutError in the thread (so in the timeout_handler() function), however, that does not work as this exception is never caught in the main loop. I think it would be possible to at least catch the exception over event.state.runner, if I am not mistaken. This approach should not work on a blocked main process loop, which is exactly where I would've wanted this feature. My workaround will probably be to call asyncio.wait_for / asyncio.timeout
Minimal working example
Output of that snippet (filenames censored):
Expected behavior If you activate the line
await asyncio.wait_for(asyncio.sleep(30), 6)
you can see the expected bevhaviour. The code now correctly raises a TimeoutError which can be caught and handeld in the error handler.