mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Publish hangs when connection is closed #556

Open krieghan opened 1 year ago

krieghan commented 1 year ago

I'm doing an upgrade from 6.6.0 to 9.1.3 (yes, I'm afraid it's been awhile). I noted a test that I wrote ages ago for the case when connections close unexpectedly

  async def test_close_publish_connection(self):
      connection = await get_connection()
      channel = await connection.channel()
      await connection.close()
      with self.assertRaises(aiormq.exceptions.ChannelInvalidStateError):
              await channel.default_exchange.publish(
                  aio_pika.Message(
                      body=b'',
                      headers={}),
                  routing_key='test_queue')

"publish" used to raise ChannelInvalidStateError. Now, the call to publish seems to hang somewhere in here (aio_pika/robust_channel.py):

79 async def get_underlay_channel(self) -> aiormq.abc.AbstractChannel: 80 -> await self._connection.ready() 81 return await super().get_underlay_channel()

So it's waiting on connection to be ready, but it never will be. This is kind of a bizarre test to have in my application - I can't quite remember why I added it - but I am kind of concerned about the behavior of publish calls when the underlying connection closes.

mosquito commented 1 year ago

the test is really strange, I think to support throwing an exception for this case is possible on the aio-pika side, but so far I am still confused by your test ))))

krieghan commented 1 year ago

I think the main case that I'm concerned with in this test is you get a channel, the channel's underlying connection is closed, and then we try to publish to the channel. This seems like something that could happen? In 6.6.0 it threw an exception, but now it waits forever for the connection to become available again.

mosquito commented 1 year ago

The way it works now is that there is a RobustConnection that contains a transport, which is a separable and independent entity that can be recreated if the real connection to the broker will be closed. The transport contains an underlay connection, which is the real connection, and exactly this connection that will be closed in the event of a network failure or whatewer.

So the .close method in your example is not closing an underlay connection, it's closing the robust connection instance.

If you want to close the underlay connection you have to get connection.transport.connection and close this.

I understand your confusion, the library has changed internally several times since 6.x.x.

Anyway you are right, we have to clearly signal the channels to close, it looks like a bug, but I don't understand its severity yet.

To be honest, I was pretty sure that a similar test was in the aio-pika test suite, but I couldn't find it quickly, I'll have to write it.

krieghan commented 1 year ago

Thanks for the explanation, and the quick response

zoola969 commented 5 months ago

I think it may cause denial of service if direct publishing is used. Imagine you have 100 rps app and RMQ server is down for 1 min, your app will hang, the event loop will be overflown by countless amount of futures and the app probably will be killed by OOM-killer.

I would like to have an adjustable publishing behavior which throws exceptions if connection is closed for n seconds