mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

RuntimeError: <RobustConnection: "amqp://guest:******@192.168.99.110/" 5 channels> connection closed #305

Open heckad opened 4 years ago

heckad commented 4 years ago
import asyncio
import gc
import weakref

import objgraph

import aio_pika

gc.disable()

async def f(rabbitmq_connection, weakset):
    async with rabbitmq_connection.channel() as channel:
        print('new created')

        weakset.add(channel)

    print('exetied')

async def main():
    rabbitmq_connection: aio_pika.Connection = await aio_pika.connect_robust(
        'amqp://guest:guest@localhost/'
    )

    weakset = weakref.WeakSet()

    async with rabbitmq_connection:
        for i in range(5):
            await f(rabbitmq_connection, weakset)

    await asyncio.sleep(10)
    print(len(weakset))

    objgraph.show_backrefs(weakset.pop())

if __name__ == '__main__':
    asyncio.run(main())
    print('Complete')

References after sleep

objgraph-461115jw

zaquest commented 4 years ago

Seems like it's trying to reconnect, even if the connection was closed by user

import os
import asyncio
import logging
import aio_pika as aiopika

logger = logging.getLogger(__name__)

AMQP_BROKER = os.getenv('AMQP_BROKER')

async def test_aio_pika():
    connection = await aiopika.connect_robust(AMQP_BROKER)
    await connection.close()

def main():
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(test_aio_pika())
        loop.run_until_complete(asyncio.sleep(10))
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    main()

outputs

DEBUG:asyncio:Using selector: EpollSelector
DEBUG:aiormq.connection:Can not read bytes from server:
Traceback (most recent call last):
  File "/home/zaquest/.virtualenvs/copreso/lib/python3.6/site-packages/aiormq/connection.py", line 377, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/home/zaquest/.virtualenvs/copreso/lib/python3.6/site-packages/aiormq/connection.py", line 329, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/home/zaquest/.pyenv/versions/3.6.6/lib/python3.6/asyncio/streams.py", line 672, in readexactly
    raise IncompleteReadError(incomplete, n)
asyncio.streams.IncompleteReadError: 0 bytes read on a total of 1 expected bytes
DEBUG:aio_pika.connection:Closing AMQP connection None
INFO:aio_pika.robust_connection:Connection to amqp://*** closed. Reconnecting after 5 seconds.

>>> # sleeps here, continues after 5 seconds, and finishes after 10 seconds

ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<RobustConnection.reconnect() done, defined at /home/zaquest/.virtualenvs/copreso/lib/python3.6/site-packages/aio_pika/robust_connection.py:149> exception=RuntimeError('<RobustConnection: "amqp://***" 0 channels> connection closed',)>
Traceback (most recent call last):
  File "/home/zaquest/.virtualenvs/copreso/lib/python3.6/site-packages/aio_pika/robust_connection.py", line 150, in reconnect
    await self.connect()
  File "/home/zaquest/.virtualenvs/copreso/lib/python3.6/site-packages/aio_pika/robust_connection.py", line 105, in connect
    raise RuntimeError("{!r} connection closed".format(self))
RuntimeError: <RobustConnection: "amqp://***" 0 channels> connection closed
zaquest commented 4 years ago

This seems to solve the issue for me

import os
import asyncio
import logging
import aio_pika as aiopika

logger = logging.getLogger(__name__)

AMQP_BROKER = os.getenv('AMQP_BROKER')

class Connection(aiopika.RobustConnection):

    def _on_connection_close(self, connection, closing, *args, **kwargs):
        if self.reconnecting:
            return

        self.connected.clear()
        self.connection = None

        # pylint: disable=bad-super-call
        # Seems like pylint tries to warn us about infinite loop that
        # might occur, but this is not our case
        # http://pylint-messages.wikidot.com/messages:e1003
        # https://stackoverflow.com/a/18208725/1211428
        super(aiopika.RobustConnection, self) \
            ._on_connection_close(connection, closing)

        if self._closed:
            logger.info(
                'Connection to %s closed manually. Will not reconnect.',
                self
            )
        else:
            logger.info(
                'Connection to %s closed. Reconnecting after %r seconds.',
                self, self.reconnect_interval
            )
            self.loop.call_later(
                self.reconnect_interval,
                lambda: self.loop.create_task(self.reconnect())
            )

async def test_aio_pika():
    connection = await aiopika.connect_robust(
        AMQP_BROKER, connection_class=Connection
    )
    await connection.close()

def main():
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(test_aio_pika())
        loop.run_until_complete(asyncio.sleep(10))
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    main()
heckad commented 4 years ago

Еhe reason for this behaviour is in the wrong callbacks creation. For example in robust_channel on line 77, you add callback with closure by self

self.add_close_callback(self._on_channel_close)

It adds the bound method which has reference to self. This creates a circular dependency: self -> _done_callbacks -> bound_method<_on_channel_close> -> self

For solve this problem need to make callback static and send senders to them explicitly.

mosquito commented 4 years ago

@heckad do you want to create PR for fixing it?

heckad commented 4 years ago

Yes, I do

zaquest commented 4 years ago

Am I in the wrong thread?) Is my issue related?

zaquest commented 4 years ago

Also that thing above, seems like it doesn't actually work and just masks the issue. Now I get other errors.

heckad commented 4 years ago

It's all because callbacks start working when they are no longer needed

vashchukmaksim commented 4 years ago

I faced the same error with RPC pattern example from the docs (and created SO question before finding this issue). When I run it exactly as in docs (two separate process) I have DeliveryError (but connection was closed before according to logs):

rabbit_1            | 2020-03-20 11:37:38.333 [info] <0.629.0> accepting AMQP connection <0.629.0> (192.168.32.4:57258 -> 192.168.32.3:5672)
rabbit_1            | 2020-03-20 11:37:38.354 [info] <0.629.0> connection <0.629.0> (192.168.32.4:57258 -> 192.168.32.3:5672): user 'dev_user' authenticated and granted access to vhost 'tgvhost'
rabbit_1            | 2020-03-20 11:37:38.371 [info] <0.651.0> accepting AMQP connection <0.651.0> (192.168.32.5:57200 -> 192.168.32.3:5672)
rabbit_1            | 2020-03-20 11:37:38.376 [info] <0.651.0> connection <0.651.0> (192.168.32.5:57200 -> 192.168.32.3:5672): user 'dev_user' authenticated and granted access to vhost 'tgvhost'
rabbit_1            | 2020-03-20 11:37:38.380 [info] <0.629.0> closing AMQP connection <0.629.0> (192.168.32.4:57258 -> 192.168.32.3:5672, vhost: 'tgvhost', user: 'dev_user'
Traceback (most recent call last):
rpc_host_1          |   File "asgi.py", line 33, in <module>
rpc_host_1          |     loop.run_until_complete(main())
rpc_host_1          |   File "/usr/local/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
rpc_host_1          |     return future.result()
rpc_host_1          |   File "asgi.py", line 20, in main
rpc_host_1          |     print(await rpc.proxy.multiply(x=100, y=i))
rpc_host_1          |   File "/usr/local/lib/python3.6/site-packages/aio_pika/patterns/rpc.py", line 341, in call
rpc_host_1          |     return await future
rpc_host_1          | aiormq.exceptions.DeliveryError: (IncomingMessage:{'app_id': None,
rpc_host_1          |  'body_size': 28,
rpc_host_1          |  'cluster_id': '',
rpc_host_1          |  'consumer_tag': None,
rpc_host_1          |  'content_encoding': '',
rpc_host_1          |  'content_type': '',
rpc_host_1          |  'correlation_id': '140221124514424',
rpc_host_1          |  'delivery_mode': 1,
rpc_host_1          |  'delivery_tag': None,
rpc_host_1          |  'exchange': '',
rpc_host_1          |  'expiration': None,
rpc_host_1          |  'headers': {'From': b'amq_7d6l66fdlzz3oh57lhmr3nq2u4'},
rpc_host_1          |  'message_id': 'c219f18e5dc6c38d4dbc8df046c3dc3d',
rpc_host_1          |  'priority': 5,
rpc_host_1          |  'redelivered': None,
rpc_host_1          |  'reply_to': 'amq_7d6l66fdlzz3oh57lhmr3nq2u4',
rpc_host_1          |  'routing_key': 'multiply',
rpc_host_1          |  'timestamp': datetime.datetime(2020, 3, 20, 11, 37, 38),
rpc_host_1          |  'type': 'call',
rpc_host_1          |  'user_id': None}, None)

If I run rpc_host as it is but then call 'multiply' manually (e.g. from Quart http view function) I got the exact same error:

Task exception was never retrieved
quart_service_01_1  | future: <Task finished coro=<RobustConnection.reconnect() done, defined at /usr/local/lib/python3.6/site-packages/aio_pika/robust_connection.py:149> exception=RuntimeError('<RobustConnection: "amqp://dev_user:******@rabbit:5672/tgvhost" 1 channels> connection closed',)>
quart_service_01_1  | Traceback (most recent call last):
quart_service_01_1  |   File "/usr/local/lib/python3.6/asyncio/tasks.py", line 180, in _step
quart_service_01_1  |     result = coro.send(None)
quart_service_01_1  |   File "/usr/local/lib/python3.6/site-packages/aio_pika/robust_connection.py", line 150, in reconnect
quart_service_01_1  |     await self.connect()
quart_service_01_1  |   File "/usr/local/lib/python3.6/site-packages/aio_pika/robust_connection.py", line 105, in connect
quart_service_01_1  |     raise RuntimeError("{!r} connection closed".format(self))
quart_service_01_1  | RuntimeError: <RobustConnection: "amqp://dev_user:******@rabbit:5672/tgvhost" 1 channels> connection closed

I tried to copy-paste RPC implementation from RabbitMQ Tutorial section and it works fine.

Is it related?

mosquito commented 4 years ago

@vashchukmaksim try to run examples it works fine.

vashchukmaksim commented 4 years ago

@mosquito Yes, I already checked it and commented on that PR. All works perfectly fine, thank you!