mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

FastAPI publisher doesn't keep the connection alive #558

Closed kolkre closed 1 year ago

kolkre commented 1 year ago

Hello, we're using FastAPI for our backend and aio_pika for our RPC microservices client/server, but we ran into the issue which occurs on the publisher side. Our connection isn't persistent, we are passing asyncio.get_event_loop() as a loop parameter for the connect function, but seems like it's not doing anything with it.

async def connect(self) -> None:
        self.connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/", loop=self.loop)
        logger.info("Connecting with RabbitMQ server %s...", self.url)

        self.channel = await self.connection.channel()
        self.queue = await self.channel.declare_queue(exclusive=True)

        await self.queue.consume(self.on_message, no_ack=True)

Any advice what to do in that situation?

kolkre commented 1 year ago

I've tried to enable debug logs in our app and we got the result: DEBUG:aiormq.connection:Closing connection <Connection: "amqp://guest:******@localhost:5672" at 0x1e2239ae8e0> cause: <class 'asyncio.exceptions.CancelledError'>

So seems like the aio_pika is dropping the connection by itself because of some asyncio exception (maybe made by FastAPI or Uvicorn)?

Lancetnik commented 1 year ago

@spectacularfailure can you show the code, represents your error exactly? I didn't see any publisher in your code example Did you have problem with a consumer or publisher?

kolkre commented 1 year ago

Oh right, my bad! It's an publisher

That's our whole class for the publisher:

class RPCClient:
    """
    RPCClient is a class that will be used to connect with RabbitMQ RPC server and execute it's functions
    """

    def __init__(self, url: str, timeout: int) -> None:
        self.url = url
        self.timeout = timeout
        self.futures: Dict[str, asyncio.Future] = {}

    async def connect(self, loop) -> None:
        self.loop = loop
        self.connection = await aio_pika.connect_robust(self.url, loop=loop)
        logger.info("Connecting with %s...", self.url)

        self.channel = await self.connection.channel()

        self.queue = await self.channel.declare_queue(exclusive=True)

        await self.queue.consume(self.on_message)

    async def on_message(self, message: AbstractIncomingMessage) -> None:
        if message.correlation_id is None or message.correlation_id not in self.futures:
            raise RPCException(
                f"Received message with no correlation ID or unknown correlation ID: {message.correlation_id} (is None: {message.correlation_id is None})"
            )

        future: asyncio.Future = self.futures.pop(message.correlation_id)
        future.set_result(message)

    async def call(self, queue: str, function: str, arguments: Dict[str, Any]):
        logger.info("Making a RPC call to %s (%s)...", queue, function)

        if self.connection.is_closed:
            await self.connect(self.loop)

        async with self.connection:
            request = RPCRequest(function=function, arguments=arguments)

            correlation_id = str(uuid.uuid4())

            future = self.loop.create_future()
            self.futures[correlation_id] = future

            request_message = Message(
                body=request.json().encode(),
                correlation_id=correlation_id,
                reply_to=self.queue.name,
                delivery_mode=DeliveryMode.PERSISTENT,
            )

            await self.channel.default_exchange.publish(
                request_message, routing_key=queue
            )

            response: AbstractIncomingMessage
            try:
                response = await asyncio.wait_for(future, timeout=self.timeout)

            except asyncio.TimeoutError:
                raise RPCException(
                    f"RPC call {queue}:{function} timed out after {self.timeout} seconds"
                )

            await response.ack()
            rpc_response = RPCResponse.parse_raw(response.body)

            if rpc_response.type is RPCMessageType.ERROR:
                raise RuntimeError(rpc_response.result)

            return rpc_response.result
Lancetnik commented 1 year ago

Well, looks like you trying to implement what I did Please, take a look at this project It's my own aio-pika wrapper, that perfectly integrated with FastAPI. So, you can use it directly or go deeper to code to find aio-pika realisation you want. For now, I need a some time to find your code problem.

Lancetnik commented 1 year ago

@spectacularfailure are you calling connect method in FastAPI lifespan at startup?

kolkre commented 1 year ago

@spectacularfailure are you calling connect method in FastAPI lifespan at startup?

Yes

@asynccontextmanager
async def lifespan(app: FastAPI):
    await rpc.connect(asyncio.get_event_loop())
    yield

app = FastAPI(lifespan=lifespan)

@app.get("/")
async def read_root():
    return await rpc.call("service", "general:generate_ticket", {}

Also thanks for you help and I'll check out your project!

Lancetnik commented 1 year ago

Let's try to remove self.loop at all and do not pass asyncio.get_event_loop(), instead self.loop.create_future() create asyncio.Future() like an object. I had the same problems with different loop, but, I can't remeber what problem it was.

aio-pika finds the required loop by itseld, so removing loop parameter helped in my case.

kolkre commented 1 year ago

Could you show me what should I change and where to use self.loop.create_future()? I think I didn't understand it correctly

Lancetnik commented 1 year ago

@spectacularfailure you close the connection using async with self.connection: in __aexit__ method So, aio-pika brokes yout RPCClient global connection state after method out of scope. Also, it looks like a some kind of lock: you lock the all event loop by awaiting callback future and callback handler can't process message cus the loop is blocked (the message will delivered to callback later, when RMQ procces it and send to consumer). By the wat it shouldn't be a problem.

Lancetnik commented 1 year ago

The code without any loop

class RPCClient:
    """
    RPCClient is a class that will be used to connect with RabbitMQ RPC server and execute it's functions
    """

    def __init__(self, url: str, timeout: int) -> None:
        self.url = url
        self.timeout = timeout
        self.futures: Dict[str, asyncio.Future] = {}

    async def connect(self) -> None:
        self.connection = await aio_pika.connect_robust(self.url)
        logger.info("Connecting with %s...", self.url)

        self.channel = await self.connection.channel()

        self.queue = await self.channel.declare_queue(exclusive=True)

        await self.queue.consume(self.on_message)

    async def on_message(self, message: AbstractIncomingMessage) -> None:
        if message.correlation_id is None or message.correlation_id not in self.futures:
            raise RPCException(
                f"Received message with no correlation ID or unknown correlation ID: {message.correlation_id} (is None: {message.correlation_id is None})"
            )

        future: asyncio.Future = self.futures.pop(message.correlation_id)
        future.set_result(message)

    async def call(self, queue: str, function: str, arguments: Dict[str, Any]):
        logger.info("Making a RPC call to %s (%s)...", queue, function)

        assert not self.connection.is_closed

        request = RPCRequest(function=function, arguments=arguments)

        correlation_id = str(uuid.uuid4())

        future = asyncio.Future()
        self.futures[correlation_id] = future

        request_message = Message(
            body=request.json().encode(),
            correlation_id=correlation_id,
            reply_to=self.queue.name,
            delivery_mode=DeliveryMode.PERSISTENT,
        )

        await self.channel.default_exchange.publish(
            request_message, routing_key=queue
        )

        response: AbstractIncomingMessage
        try:
            response = await asyncio.wait_for(future, timeout=self.timeout)

        except asyncio.TimeoutError:
            raise RPCException(
                f"RPC call {queue}:{function} timed out after {self.timeout} seconds"
            )

        await response.ack()
        rpc_response = RPCResponse.parse_raw(response.body)

        if rpc_response.type is RPCMessageType.ERROR:
            raise RuntimeError(rpc_response.result)

        return rpc_response.result
kolkre commented 1 year ago

I tested it out and you were right! Thank you very much I was fighting with this thing for the few days.

Lancetnik commented 1 year ago

No problem, @spectacularfailure But, please, check the Propan: it still use the better RPC mechanism then yours

https://www.rabbitmq.com/direct-reply-to.html

kolkre commented 1 year ago

Sure! We'll check it out and consider using it :)