Our application constantly opens and closes channels using robust connection.
We detected memory leak, it can be reproduced using following snippet:
import asyncio
import aio_pika
CHANNELS_COUNT = 1000
async def main(amqp_uri):
connection = await aio_pika.connect_robust(amqp_uri)
print(len(connection.transport.connection.closing._callbacks))
for _ in range(CHANNELS_COUNT):
async with connection.channel():
await asyncio.sleep(0)
print(len(connection.transport.connection.closing._callbacks))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--amqp_uri")
args = parser.parse_args()
asyncio.run(main(args.amqp_uri))
Not an expert in aio_pika, but seems leak can be fixed in abc.UnderlayConnection: callbacks should be removed on channel closing with .remove_done_callback, smth like:
async def close(self, exc: Optional[ExceptionType] = None) -> Any:
if self.close_callback.finished.is_set():
return
self.channel.closing.remove_done_callback(self.close_callback)
self.channel.connection.closing.remove_done_callback(self.close_callback)
result: Any = await self.channel.close(exc)
await self.close_callback.wait()
return result
Our application constantly opens and closes channels using robust connection. We detected memory leak, it can be reproduced using following snippet:
Not an expert in aio_pika, but seems leak can be fixed in abc.UnderlayConnection: callbacks should be removed on channel closing with .remove_done_callback, smth like: