mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Consumer gets stuck on ack #608

Open kimdre opened 6 months ago

kimdre commented 6 months ago

I'm trying to run a consumer in a docker swarm cluster. The app consumes multiple events from a queue, writes them as bulk a database and then afterwards acknowledges them all.

For some reason, however, the consumer often gets stuck when acknowledging the messages while the connection stays open, especially when the app consumes a lot of messages at once and also consumes far less than it could.

A simple restart to fix the consumer also doesn't work, as it still won't consume any messages anymore (except for 1 to 4 messages, then it gets stuck again). To fix it, I have to first delete the queue with all messages in it and then I can restart the consumer app.

When I try to run this exact setup in a normal docker stack on my computer without swarm enabled, it works fine for some reason.

app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.117] [utils.py.save_events:54] INFO: Committing 17 events to database.
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.131] [utils.py.save_events:57] DEBUG: Committed events to database successfully.
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.132] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00d\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.132] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00e\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.133] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00f\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.133] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00g\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.133] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00h\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.134] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00i\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.134] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00j\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.135] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00k\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.135] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00l\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.135] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00m\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.136] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00n\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.136] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00o\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.137] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00p\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.137] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00q\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.137] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00r\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.138] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00s\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.138] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00t\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:44.855] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x08\x00\x00\x00\x00\x00\x00\xce', should_close=False, drain_future=None)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:31:14.857] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x08\x00\x00\x00\x00\x00\x00\xce', should_close=False, drain_future=None)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:31:44.857] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x08\x00\x00\x00\x00\x00\x00\xce', should_close=False, drain_future=None)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:32:14.859] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x08\x00\x00\x00\x00\x00\x00\xce', should_close=False, drain_future=None)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:32:44.860] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x08\x00\x00\x00\x00\x00\x00\xce', should_close=False, drain_future=None)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:33:14.861] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x08\x00\x00\x00\x00\x00\x00\xce', should_close=False, drain_future=None)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:33:44.863] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x08\x00\x00\x00\x00\x00\x00\xce', should_close=False, drain_future=None)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:33:44.863] [connection.py.__reader:668] WARNING: Server connection <Connection: "amqp://user:******@rabbitmq:5672/app" at 0x7f082d6ed4f0> was stuck. No frames were received in 183 seconds.
app_processor.1.lybdbcd90psc@worker-1    | [2023-12-20 23:53:02.623] [connection.py.__writer:751] DEBUG: Writer exited for <Connection: "amqp://user:******@rabbitmq:5672/app" at 0x7efd0b70d540>
app_processor.1.lybdbcd90psc@worker-1    | [2023-12-20 23:53:02.623] [connection.py._on_reader_done:536] DEBUG: Reader exited for <Connection: "amqp://user:******@rabbitmq:5672/app" at 0x7efd0b70d540>
app_processor.1.lybdbcd90psc@worker-1    | [2023-12-20 23:53:02.624] [connection.py._on_close:783] DEBUG: Closing connection <Connection: "amqp://user:******@rabbitmq:5672/app" at 0x7efd0b70d540> cause: CancelledError()
app_processor.1.lybdbcd90psc@worker-1    | [2023-12-20 23:53:02.624] [robust_connection.py._on_connection_close:83] INFO: Connection to amqp://user:******@rabbitmq:5672/app closed. Reconnecting after 5 seconds.

The code looks like this:

if await save_events(messages):
    for message in message_list_chunk:
        await message.ack()
else:
    self.failed_messages.extend(messages)
kimdre commented 5 months ago

Update: The issue only occurs in a docker swarm cluster. Using a normal docker or a normal rabbitmq instance works fine.

Elli-Rid commented 5 months ago

Hey @kimdre did you notice if messages would start build up after consumer(s) stuck? Also, if there is a specific message size limit at which point the issue persistently occurs?

kimdre commented 5 months ago

Hi, yes the publisherstill worked so new messages were building up in the queue while the consumers didn't do anything despite being connected. The size/amount was pretty random, I could not find a common pattern there unfortunately.

Elli-Rid commented 3 months ago

I've seen something similar that was caused by MTU size mismatch along the networking route so once a message that was larger than 8500 bytes arrives RabbitMQ connection gets stuck forever. The solution was to configure proper peering connection as it was purely an infrastructure thing (AWS). This docs from AWS describes well what was happening: https://docs.aws.amazon.com/vpc/latest/tgw/transit-gateway-quotas.html#mtu-quotas

The MTU of a network connection is the size, in bytes, of the largest permissible packet that can be passed over the connection. The larger the MTU of a connection, the more data that can be passed in a single packet. A transit gateway supports an MTU of 8500 bytes for traffic between VPCs, AWS Direct Connect, Transit Gateway Connect, and peering attachments. Traffic over VPN connections can have an MTU of 1500 bytes.

When migrating from VPC peering to use a transit gateway, an MTU size mismatch between VPC peering and the transit gateway might result in some asymmetric traffic packets dropping. Update both VPCs at the same time to avoid jumbo packets dropping due to a size mismatch.

Packets with a size larger than 8500 bytes that arrive at the transit gateway are dropped.