mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

RuntimeError: Set changed size during iteration (in aio-pika 9.3.0) #589

Open dvarrazzo opened 8 months ago

dvarrazzo commented 8 months ago

Hello,

I received Sentry issues from a live server, which, if I understand correctly, happened in a moment in which a misconfigured firewall closed the door in front of an aio-pika client, and the k8s health check was trying to connect.

The error is RuntimeError: Set changed size during iteration on this line:

https://github.com/mosquito/aio-pika/blob/0e5db7fe33eb07daa9458631376b5ce3b2422cad/aio_pika/robust_connection.py#L98

Unfortunately I can't work more on it in order to provide anything reproducible. However maybe you want to make a copy of this set before iterating it?

--- aio_pika/robust_connection.py.bak   2023-10-16 20:55:57.867441312 +0200
+++ aio_pika/robust_connection.py       2023-10-16 20:56:12.163364473 +0200
@@ -95,7 +95,7 @@
             raise RuntimeError("No active transport for connection %r", self)

         try:
-            for channel in self.__channels:
+            for channel in list(self.__channels):
                 try:
                     await channel.restore()
                 except Exception:

aio-pika version: 9.3.0, Python version: 3.10.12. I found #388 but it claims to having been fixed in >=8, so this might be a different instance.

Full stack trace ``` CancelledError: null File "apps_mq/mq.py", line 265, in on_message await message.ack() File "aio_pika/message.py", line 476, in ack await self.channel.basic_ack( File "aiormq/channel.py", line 556, in basic_ack await drain_future CancelledError: null File "aio_pika/queue.py", line 37, in consumer return await create_task(callback, message) CancelledError: null File "aiormq/abc.py", line 44, in __inner return await self.task RuntimeError: Set changed size during iteration File "apps_mq/mq.py", line 279, in check_health await asyncio.gather(test_sender(), test_receiver()) File "asyncio/tasks.py", line 650, in _wrap_awaitable return (yield from awaitable.__await__()) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "asyncio/tasks.py", line 650, in _wrap_awaitable return (yield from awaitable.__await__()) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "apps_mq/mq.py", line 268, in test_receiver await queue.consume(on_message) File "aio_pika/robust_queue.py", line 120, in consume consumer_tag = await super().consume( File "aio_pika/queue.py", line 223, in consume consume_result = await channel.basic_consume( File "aiormq/channel.py", line 526, in basic_consume return await self.rpc( File "aiormq/base.py", line 164, in wrap return await self.create_task(func(self, *args, **kwargs)) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "apps_mq/mq.py", line 268, in test_receiver await queue.consume(on_message) File "aio_pika/robust_queue.py", line 120, in consume consumer_tag = await super().consume( File "aio_pika/queue.py", line 223, in consume consume_result = await channel.basic_consume( File "aiormq/channel.py", line 526, in basic_consume return await self.rpc( File "aiormq/base.py", line 164, in wrap return await self.create_task(func(self, *args, **kwargs)) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "apps_mq/mq.py", line 268, in test_receiver await queue.consume(on_message) File "aio_pika/robust_queue.py", line 120, in consume consumer_tag = await super().consume( File "aio_pika/queue.py", line 223, in consume consume_result = await channel.basic_consume( File "aiormq/channel.py", line 526, in basic_consume return await self.rpc( File "aiormq/base.py", line 164, in wrap return await self.create_task(func(self, *args, **kwargs)) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "apps_mq/mq.py", line 268, in test_receiver await queue.consume(on_message) File "aio_pika/robust_queue.py", line 120, in consume consumer_tag = await super().consume( File "aio_pika/queue.py", line 223, in consume consume_result = await channel.basic_consume( File "aiormq/channel.py", line 526, in basic_consume return await self.rpc( File "aiormq/base.py", line 164, in wrap return await self.create_task(func(self, *args, **kwargs)) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "apps_mq/mq.py", line 268, in test_receiver await queue.consume(on_message) File "aio_pika/robust_queue.py", line 120, in consume consumer_tag = await super().consume( File "aio_pika/queue.py", line 223, in consume consume_result = await channel.basic_consume( File "aiormq/channel.py", line 526, in basic_consume return await self.rpc( File "aiormq/base.py", line 164, in wrap return await self.create_task(func(self, *args, **kwargs)) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "aio_pika/channel.py", line 172, in _open await self._on_open() File "aio_pika/robust_channel.py", line 126, in _on_open await channel.basic_qos( File "aiormq/channel.py", line 709, in basic_qos return await self.rpc( File "aiormq/base.py", line 164, in wrap return await self.create_task(func(self, *args, **kwargs)) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "***/routers/tools.py", line 70, in rabbitmq_check await mq.mq.check_health() File "apps_mq/mq.py", line 281, in check_health await queue.delete(if_unused=False, if_empty=False) File "aio_pika/queue.py", line 346, in delete return await channel.queue_delete( File "aiormq/channel.py", line 879, in queue_delete return await self.rpc( File "aiormq/base.py", line 164, in wrap return await self.create_task(func(self, *args, **kwargs)) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "apps_mq/mq.py", line 268, in test_receiver await queue.consume(on_message) File "aio_pika/robust_queue.py", line 120, in consume consumer_tag = await super().consume( File "aio_pika/queue.py", line 223, in consume consume_result = await channel.basic_consume( File "aiormq/channel.py", line 526, in basic_consume return await self.rpc( File "aiormq/base.py", line 164, in wrap return await self.create_task(func(self, *args, **kwargs)) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "asyncio/tasks.py", line 650, in _wrap_awaitable return (yield from awaitable.__await__()) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "asyncio/tasks.py", line 650, in _wrap_awaitable return (yield from awaitable.__await__()) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "asyncio/tasks.py", line 650, in _wrap_awaitable return (yield from awaitable.__await__()) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "asyncio/tasks.py", line 650, in _wrap_awaitable return (yield from awaitable.__await__()) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "asyncio/tasks.py", line 650, in _wrap_awaitable return (yield from awaitable.__await__()) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "asyncio/tasks.py", line 650, in _wrap_awaitable return (yield from awaitable.__await__()) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "asyncio/tasks.py", line 650, in _wrap_awaitable return (yield from awaitable.__await__()) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "asyncio/tasks.py", line 650, in _wrap_awaitable return (yield from awaitable.__await__()) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "asyncio/tasks.py", line 650, in _wrap_awaitable return (yield from awaitable.__await__()) File "aiormq/abc.py", line 46, in __inner raise self._exception from e File "aio_pika/robust_connection.py", line 98, in _on_connected for channel in self.__channels: File "_weakrefset.py", line 65, in __iter__ for itemref in self.data: ```
mosquito commented 8 months ago

Don’t you want to make a PR?

dvarrazzo commented 8 months ago

As I said, I can't reproduce it, and I don't know the project well enough to know if that's the best thing to do. If you think that my changeset is right, sure, I can make a PR.