nomhoi / cosmicpython-fastapi

Other
38 stars 6 forks source link

Messagebus doesn't seem to be async #2

Open jalvespinto opened 1 year ago

jalvespinto commented 1 year ago

Did you check if your messagebus is async? I am new to asyncio, but it seems will be sync. This is what I was thinking:

class AsyncIAMMessageBus:
    def __init__(
        self,
        uow: AsyncIAMUnitOfWork,
        event_handlers: dict[type[Event], list[Coroutine]],
        command_handlers: dict[type[Command], Coroutine],
    ):
        self.uow = uow
        self.event_handlers = event_handlers
        self.command_handlers = command_handlers

    async def handle(self, message: Event | Command):
        if isinstance(message, Command):
            await self._handle_command(message)
        elif isinstance(message, Event):
            await self._handle_event(message)
        else:
            raise Exception(f"{message} is not a Command or Event")

    async def _completed(self, handler, message: Event | Command):
        try:
            await handler(message)
        except Exception:
            if isinstance(message, Event):
                logger.exception(f"Exception handling event {message}")
                pass
            else:
                logger.exception(f"Exception handling command {message}")
                raise
        else:
            new_events = self.uow.collect_new_events()
            if new_events:
                async with anyio.create_task_group() as tg:
                    for event in new_events:
                        tg.start_soon(self.handle, event)

    async def _handle_event(self, event: Event):
        handlers = [handler for handler in self.event_handlers[type(event)]]
        async with anyio.create_task_group() as tg:
            for handler in handlers:
                tg.start_soon(self._completed, handler, event)

    async def _handle_command(self, command: Command):
        logger.debug("handling command %s", command)
        handler = self.command_handlers[type(command)]
        async with anyio.create_task_group() as tg:
            tg.start_soon(self._completed, handler, command)

I am using anyio, but it would be the same with asyncio. Using TaskGroup. This way tasks would run concurrently and you would pickup new events right away. I also believe I should add timeout for the tasks...

nomhoi commented 1 year ago

My version is fully async. In this project we need to run command and event handlers consequently. In other cases may be we need to run some handlers consequently, and some handlers can be run concurrently. It depends on your circumstances.

Pay attention on blocking IO and CPU-bound code in handlers. May be you need to use thread or process pools: https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools For example, smtplib.SMTP.sendmail is a blocking IO method. We need to run it with run_in_executor().

jalvespinto commented 1 year ago

I thought about that and I thought it was ok to run them concurrently, even for the book's project. If you receive a command, that should be handled by a single handler until completion or an exception should be raised. This may trigger an domain event that may be handled by multiple handlers and we are passing errors silently for events since we have consistency boundaries and an UoW that manages the atomic success or failure to guarantee consistency where we must have it (on commands). Isn't that right? If, for example, a event have two handlers send_user_email and add_points_to_user, why not run bought concurrently? I don't know where would the edge cases be, but if I can live with an event that is not handled by one handler, it should be (not just fine, but better) to have as many handler succeeding as possible. In the end I would have a "smaller" inconsistency state, since the command that generated the event already changed the state.

What I think I am missing is where to place timeouts. I am using anyio since it seems to work better for cancellation and timeout of task groups (and is what fastapi uses), but still not sure how it really works, specially with the dependencies that in most cases uses asyncio and anyio cannot cancel those.

nomhoi commented 1 year ago

I thought about that and I thought it was ok to run them concurrently, even for the book's project.

I do not intend change it to run concurrently.

instanceofmel commented 6 months ago

The first problem you'll get when running multiple event handlers concurrently is that they all use the same unit of work object and therefore share the session. Committing in event A would cause to commit changes of event B, even when event B is not finished yet. A possible solution for this is passing an unit of work factory to the messagebus, which then passes a new unit of work object to each event handler. To still be able to make assertions based on the unit of work inside of unit tests, the fake unit of work could be a singleton.

dbaber commented 4 months ago

The first problem you'll get when running multiple event handlers concurrently is that they all use the same unit of work object and therefore share the session. Committing in event A would cause to commit changes of event B, even when event B is not finished yet. A possible solution for this is passing an unit of work factory to the messagebus, which then passes a new unit of work object to each event handler. To still be able to make assertions based on the unit of work inside of unit tests, the fake unit of work could be a singleton.

Do you still run into the same concurrency/threading issues with UOW and the MessageBus event queue if you were to say use gunicorn with multiple uvicorn worker processes as outlined here?

https://fastapi.tiangolo.com/deployment/server-workers/

Are you forced to run a single worker process in this FastAPI version without refactoring?