mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.23k stars 187 forks source link

RobustConnection doesn't reconnect if RabbitMQ is down #202

Closed decaz closed 4 years ago

decaz commented 5 years ago

If connection was closed after it was connected then it tries to reconnect, but if it cannot connect from the very beginning then it wouldn't try to reconnect.

mosquito commented 5 years ago

@decaz since 5.0.0b2 the RobustConnection was covered by integration tests. The coverage now is poor, but test infrastructure is ready to use. Could you provide any test for reproduce this issue?

decaz commented 5 years ago

@mosquito hope that it looks like a working version.

test_amqp_robust.py:

...
class TestCase(AMQPTestCase):
    ...
    async def create_connection(self, start_proxy=True, cleanup=True):
        ...
        if start_proxy:
            await self.proxy.start()
        ...

    async def test_robust_connect(self):
        client = await self.create_connection(start_proxy=False)
        self.assertIsInstance(client, RobustConnection)

        await self.proxy.start()

        logging.info("Waiting for reconnect")
        await asyncio.sleep(5, loop=self.loop)

        logging.info("Waiting connection")
        await client.ready()
decaz commented 5 years ago

@mosquito I have the following consumer and publisher check scripts:

consumer.py:

import asyncio
import aio_pika

async def main():
    connection = await aio_pika.connect_robust('amqp://guest:guest@127.0.0.1/')
    channel = await connection.channel()
    queue = await channel.declare_queue('test_queue', auto_delete=True)
    async for message in queue:
        with message.process():
            print(message.body)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

publisher.py

import asyncio
import aio_pika

async def main():
    connection = await aio_pika.connect_robust('amqp://guest:guest@127.0.0.1/')
    channel = await connection.channel()
    i = 0
    while True:
        msg = aio_pika.Message(body=f'Message {i}'.encode())
        await channel.default_exchange.publish(msg, routing_key='test_queue')
        await asyncio.sleep(1)
        i += 1

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

I see there are two issues with robust connection.

1) When I stop RabbitMQ service and run every of these scripts I get error ConnectionError: [Errno 111] Connect call failed ('127.0.0.1', 5672) and the proccesses exit with code 1 without any tries to reconnect.

2) When RabbitMQ service is running with these scripts running as well and I stop RabbitMQ service the consumer.py process continues tries to reconnect but publisher.py process stops with the following exception:

Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /usr/lib/python3.6/asyncio/tasks.py:530> exception=<ConnectionClosed: The AMQP connection was closed (320) CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'>>
Traceback (most recent call last):
  File "/home/decaz/.virtualenvs/aio-pika5/lib/python3.6/site-packages/aiormq/base.py", line 22, in __await__
    return (yield from self.task.__await__(*args, **kwargs))
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/decaz/.virtualenvs/aio-pika5/lib/python3.6/site-packages/aiormq/base.py", line 24, in __await__
    raise self.exception from e
aiormq.exceptions.ConnectionClosed: (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
Traceback (most recent call last):
  File "publisher.py", line 18, in <module>
    loop.run_until_complete(main())
  File "/usr/lib/python3.6/asyncio/base_events.py", line 468, in run_until_complete
    return future.result()
  File "publisher.py", line 11, in main
    await channel.default_exchange.publish(msg, routing_key='test_queue')
  File "/home/decaz/workspace/aio-pika5/aio_pika/exchange.py", line 202, in publish
    loop=self.loop, timeout=timeout
  File "/usr/lib/python3.6/asyncio/tasks.py", line 339, in wait_for
    return (yield from fut)
  File "/home/decaz/.virtualenvs/aio-pika5/lib/python3.6/site-packages/aiormq/channel.py", line 380, in basic_publish
    async with self.lock:
  File "/home/decaz/.virtualenvs/aio-pika5/lib/python3.6/site-packages/aiormq/channel.py", line 71, in lock
    raise RuntimeError('%r closed' % self)
RuntimeError: <Channel: "1"> closed
pappachino commented 5 years ago

For the scenario where rabbitmq is not running and we attempt to connect and it fails, would it make sense to have a pika like connection_attempts and retry_delay connection parameters.

Or is there one already which I am probably missing to see ?

d21d3q commented 5 years ago

Here is my workaround for this issue:

connection = None

 while True:
    try:
        connection = await aio_pika.connect_robust(
                "amqp://guest:guest@broker/"
                )
        break
    except ConnectionError:
        print('waiting for connection')
        await asyncio.sleep(5)

Moreover, during playing around with "robustness" I've discovered, that if you will be lucky enough and you will start broker during connection (first) attempt, then it will crash because of IncompatibleProtocolError. I guess, that rabbitmq accepts (network) connection but it was not awake enough to speak :)

Traceback (most recent call last):
  File "/home/d21d3q/workspace/homework/balena-homework/venv/lib/python3.7/site-packages/aiormq/connection.py", line 231, in connect
    res = await self.__receive_frame()
  File "/home/d21d3q/workspace/homework/balena-homework/venv/lib/python3.7/site-packages/aiormq/connection.py", line 313, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/lib64/python3.7/asyncio/streams.py", line 677, in readexactly
    raise IncompleteReadError(incomplete, n)
asyncio.streams.IncompleteReadError: 0 bytes read on a total of 1 expected bytes

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "ampq.py", line 37, in <module>
    loop.run_until_complete(main(loop))
  File "/usr/lib64/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "ampq.py", line 8, in main
    "amqp://guest:guest@localhost/", loop=loop
  File "/home/d21d3q/workspace/homework/balena-homework/venv/lib/python3.7/site-packages/aio_pika/robust_connection.py", line 231, in connect_robust
    ssl_options=ssl_options, **kwargs
  File "/home/d21d3q/workspace/homework/balena-homework/venv/lib/python3.7/site-packages/aio_pika/connection.py", line 299, in connect
    await connection.connect()
  File "/home/d21d3q/workspace/homework/balena-homework/venv/lib/python3.7/site-packages/aio_pika/robust_connection.py", line 81, in connect
    return await super().connect(timeout=timeout)
  File "/home/d21d3q/workspace/homework/balena-homework/venv/lib/python3.7/site-packages/aio_pika/connection.py", line 118, in connect
    timeout=timeout
  File "/usr/lib64/python3.7/asyncio/tasks.py", line 414, in wait_for
    return await fut
  File "/home/d21d3q/workspace/homework/balena-homework/venv/lib/python3.7/site-packages/aiormq/connection.py", line 505, in connect
    await connection.connect()
  File "/home/d21d3q/workspace/homework/balena-homework/venv/lib/python3.7/site-packages/aiormq/base.py", line 171, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/d21d3q/workspace/homework/balena-homework/venv/lib/python3.7/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
  File "/home/d21d3q/workspace/homework/balena-homework/venv/lib/python3.7/site-packages/aiormq/connection.py", line 235, in connect
    raise exc.IncompatibleProtocolError(*e.args) from e
aiormq.exceptions.IncompatibleProtocolError: 0 bytes read on a total of 1 expected bytes
decaz commented 4 years ago

There are no problems with 6.3.0 version. @d21d3q look at https://github.com/mosquito/aiormq/issues/32 please.

cyril94440 commented 5 months ago

We still encounter this issue, with the following error message :

Connection attempt to "amqps://datascience_prod:******@********:5671/prod" failed: 0 bytes read on a total of 1 expected bytes. Reconnecting after 5 seconds.

Never able to reconnect, while restarting the service will connect successfully.

Any idea?

mosquito commented 5 months ago

@cyril94440 is RabbitMQ is down on this case? Is it really relevant to this issue? Please provide your rabbitmq and aio-pika versions

cyril94440 commented 5 months ago

@mosquito RabbitMQ was down for a few seconds (even though it is hosted on AWS Amazon MQ). But my nodejs service reconnected successfully, but our python services using this lib got stuck.

We are using RabbitMQ 3.11.28 and ai-pika 9.4.0

mosquito commented 5 months ago

In 9.4.1 some related bugs had been fixed. See: https://github.com/mosquito/aio-pika/releases/tag/9.4.1

cyril94440 commented 5 months ago

Thanks @mosquito I'll let you know if we encounter the issue again after updating