pytransitions / transitions

A lightweight, object-oriented finite state machine implementation in Python with many extensions
MIT License
5.5k stars 525 forks source link

Enable usage of other event loops in `AsyncMachine` #420

Closed thedrow closed 4 years ago

thedrow commented 4 years ago

AnyIO is a library which abstracts event loops. Currently our implementation only supports AsyncIO but trio is also a popular event loop which we can support. AnyIO allows us to support both with ease.

I can port all of the code except these lines: https://github.com/pytransitions/transitions/blob/9237b9e0978ad3d904d4f0e7b3d82d99a6b7e76f/transitions/extensions/asyncio.py#L125-L136

I think we should be using a CancelScope but I'm not really sure how. Any feedback is appreciated.

aleneum commented 4 years ago

Being able to use curio or trio sounds nice. However, I am hestitating when it comes to new dependencies because they all mean more maintenance effort. If it's worth it and required by the community I am willing to accept it but right now I'd rather stick with asyncio and maybe finding ways to enable easier customization through inheritance. I will close this for now but feel free to comment. If a feature request arises I will reopen it.

thedrow commented 4 years ago

I don't think we should introduce a hard dependency.

Maybe an extra dependency? Or a different package?

I need this because Celery will be using trio.

aleneum commented 4 years ago

I am currently attempting to decouple asyncio calls from AsyncMachine to make overriding easier (see dev-trio)

ToDo:

aleneum commented 4 years ago

I just pushed changes to AsyncMachine that finished the isolation of asyncio. AsyncTimeout still relies on asyncio though. If another async library should be used, a subclass of AsyncMachine needs to take care of:

Based on these changes I could implement a TrioMachine. I copied the docstring from AsyncMachine to make it a bit more comprehensible.

Update: I had to rewrite process_context and switch_model_context because there was a context override (async_task) happening, when two contexts triggered events on the same model. The previous test did not catch that because it only triggered an event from an already assigned context. And I renamend is_subtask to current_context because this is more fitting imho.

Update2: Added another corner case for async testing. Setting async_task to None when returning is not a good idea since it will override the variable when it returns before that running_task. This could happen when the transition condition checks failed.

import trio

from transitions.extensions.asyncio import AsyncMachine

class TrioMachine(AsyncMachine):

    @staticmethod
    async def await_all(partials):
        """
        Executes callables without parameters in parallel and collects their results.
        Args:
            partials (list): A list of callable functions

        Returns:
            list: A list of results. Using asyncio, the list will be in the same order as the passed callables.
        """
        results = []
        async def with_result(func):
            results.append(await func())

        async with trio.open_nursery() as nursery:
            for par in partials:
                nursery.start_soon(with_result, par)

        return results

    async def process_context(self, func, model):
        """
        This function is called by an async event to make callbacks processed in Event._trigger cancellable.
        Using asyncio, this will result in a try-catch block catching CancelledEvents.
        Args:
            func (callable): The partial of Event._trigger with all parameters already assigned
            model (object): The currently processed model

        Returns:
            bool: returns the success state of the triggered event
        """
        if self.current_context.get() is None:
            with trio.CancelScope() as scope:
                self.current_context.set(scope)
                return await func()
            return False
        return await func()

    def switch_model_context(self, model):
        """
        This method is called by an async transition when all conditional tests have passed and the transition will happen.
        This requires already running tasks to be cancelled.
        Args:
            model (object): The currently processed model
        """
        current_scope = self.current_context.get()
        running_scope = self.async_tasks.get(model, None)
        if current_scope != running_scope:
            if running_scope is not None:
                self.async_tasks[model].cancel()
            self.async_tasks[model] = self.current_context.get()

async def cancel_soon():
    await trio.sleep(0.1)
    raise TimeoutError("Hasn't been cancelled")

async def await_true():
    await trio.sleep(0.1)
    return True

m1 = TrioMachine(states=['A', 'B', 'C'], initial='A', name="m1")
m2 = TrioMachine(states=['A'], initial='A', name='m2')
m1.add_transition(trigger='go', source='A', dest='B', before=cancel_soon)
m1.add_transition(trigger='fix', source='A', dest='C', after=cancel_soon)
m1.add_transition(trigger='check', source='C', dest='B', conditions=lambda: False)
m1.add_transition(trigger='reset', source='C', dest='A')
m2.add_transition(trigger='go', source='A', dest=None, conditions=m1.is_C, after=m1.reset)

async def delay(time, func):
    await trio.sleep(time)
    await func()

async def main():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(m1.go)
        nursery.start_soon(delay, 0.05, m1.fix)
        nursery.start_soon(delay, 0.07, m1.check)
        nursery.start_soon(delay, 0.1, m2.go)

trio.run(main)

assert m1.is_A()
thedrow commented 4 years ago
  • asyncio.iscoroutinefunction could be replaced with inspect.isawaitable with minor refactoring That's the one piece of code you shouldn't need to replace. That function is more extensive in its checks (e.g. mocks work with it but don't with inspect.isawaitable).

  • calls to asyncio.gather have been unified to call a AsyncMachine.await_all instead. This should make the usage of nurseries and stuff easier. But I need to test that.

Not sure how you can do that. It's easier to replace the code with AnyIO.

  • cancellation of running tasks has been isolated in AsyncMachine.switch_model_context(model)

ToDo:

  • How does trio handle cancellation? (Nested)AsyncEvent awaits AsyncMachine._process and catches asyncio.CancelledError.

Trio uses cancel scopes.

thedrow commented 4 years ago

So you'd rather introduce trio as an optional dependency instead of using AnyIO which requires the same code as the trio version but allows you to support multiple event loops at once?

aleneum commented 4 years ago

So you'd rather introduce trio as an optional dependency instead of using AnyIO which requires the same code as the trio version but allows you to support multiple event loops at once?

Haha, no. That would be indeed arguably inefficient. As I said in the beginning:

... right now I'd rather stick with asyncio and maybe finding ways to enable easier customization through inheritance.

AsyncMachine in dev-trio has no dependency to asyncio (outside of the mentioned functions) which means it should be possible extend it with your favourite event loop. I used trio to illustrate how this can be done because you mentioned thats the one you mostly interested in. The TrioMachine however wont be -- as of today -- part of transitions.

aleneum commented 4 years ago

As far as I can tell trio and anyio share the same syntax when it comes to executing tasks in parallel and handling cancellation. I guess a AnyIOMachine could roughly look like this:

from anyio import create_task_group, open_cancel_scope

class AnyIOMachine(AsyncMachine):

    @staticmethod
    async def await_all(partials):
        results = []
        async def with_result(func):
            results.append(await func())

        async with create_task_group as tg:
            for par in partials:
                tg.spawn(with_result, par)

        return results

    async def process_context(self, func, model):
        if self.current_context.get() is None:
            async with open_cancel_scope() as scope:
                self.current_context.set(scope)
                return await func()
            return False
        return await func()

    def switch_model_context(self, model):
        current_scope = self.current_context.get()
        running_scope = self.async_tasks.get(model, None)
        if current_scope != running_scope:
            if running_scope is not None:
                self.async_tasks[model].cancel()
            self.async_tasks[model] = self.current_context.get()

Maybe AnyIO has some better mechanisms to get the results of callbacks (like asyncio.gather) but this should work nevertheless.

aleneum commented 4 years ago

0.8.2 has been released. It should be possible to use AsyncMachine as a base for a machine used with trio (ref) or anyio (ref). I consider this solved. Feel free to comment and I will reopen the issue when necessary.

thedrow commented 4 years ago

I think we can create another repository with the AnyIO implementation or provide it in this repository.

thedrow commented 3 years ago

The implementation has a problem which I'm trying to fix:

    async def process_context(self, func, model):
        if self.current_context.get() is None:
            try:
                async with open_cancel_scope() as scope:
                    self.current_context.set(scope)
                    return await func()
            except get_cancelled_exc_class():
                return False # This should raise instead.
        return await func()

Your version did not catch the cancelled exception unfortunately which is why we didn't see this problem before.

We must raise the cancelled exception or we risk undefined behavior.