mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.23k stars 187 forks source link

In aio-pika>=5 reconnect completely broken (RobustConnection) #231

Open DriverX opened 5 years ago

DriverX commented 5 years ago

Try yourself.

Reconnect logic in RobustConnection since aio-pika >= 5 is completely broken. (In aio-pika 4 broken too, but not so dramatically).

import asyncio
import argparse
import signal
from typing import Dict, Any
import logging
import aio_pika
import itertools

logger = logging.getLogger(__name__)

def parse_args() -> Dict[str, Any]:
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '-r', '--rabbit',
        help='Rabbit address, ex: host:port',
        default='amqp://rabbitmq',
    )

    commands = parser.add_subparsers(dest='subcmd')
    consumer = commands.add_parser('consumer')
    consumer.add_argument(
        '-q', '--queue',
    )

    publisher = commands.add_parser('publisher')
    publisher.add_argument(
        '-q', '--queue',
    )

    return vars(parser.parse_args())

async def publisher_task(conf: Dict) -> None:
    logger.info('Run publisher')

    queue_name = conf['queue']
    count = itertools.count()
    conn = await aio_pika.connect_robust(conf['rabbit'])
    async with conn:
        channel = await conn.channel(publisher_confirms=True)
        # queue = await channel.declare_queue(
        #     queue_name, passive=True
        # )

        while True:
            message = 'message: {}'.format(next(count))
            logger.info('Send message: %r', message)
            await channel.default_exchange.publish(
                aio_pika.Message(
                    body=message.encode()
                ),
                routing_key=queue_name,
            )
            # await asyncio.sleep(0.1)

async def consumer_task(conf: Dict) -> None:
    logger.info('Run consumer')

    queue_name = conf['queue']
    conn = await aio_pika.connect_robust(conf['rabbit'])

    async with conn:
        channel = await conn.channel()
        await channel.set_qos(prefetch_count=10)

        queue = await channel.declare_queue(
            queue_name, passive=True
        )

        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                if aio_pika.version_info > (5,):
                    async with message.process():
                        logger.info('Message - %r', message.body)
                else:
                    with message.process():
                        logger.info('Message - %r', message.body)

async def run(conf: Dict) -> None:
    subcmd = conf['subcmd']
    if subcmd == 'publisher':
        await publisher_task(conf)
    elif subcmd == 'consumer':
        await consumer_task(conf)
    else:
        raise ValueError(f'unknown subcommand {subcmd!r}')

def main():
    args = parse_args()

    logging.basicConfig(
        level=logging.DEBUG,
        format=(
            '[%(asctime)s.%(msecs)03d] [%(process)d] [%(levelname)1.1s] '
            '[%(name)s]:\t%(message)s'
        ),
        datefmt='%Y.%m.%d %H:%M:%S'
    )

    logger.debug('parsed args - %r', args)

    if not args.get('subcmd'):
        raise ValueError('subcommand not specified')

    loop = asyncio.get_event_loop()
    loop.set_debug(False)
    run_task = loop.create_task(run(args))

    def stop(_):
        nonlocal loop
        loop.stop()

    run_task.add_done_callback(stop)

    loop.add_signal_handler(signal.SIGTERM, run_task.cancel)
    loop.add_signal_handler(signal.SIGINT, run_task.cancel)
    try:
        loop.run_forever()
        if not run_task.cancelled():
            run_task.result()
    finally:
        loop.close()

if __name__ == '__main__':
    main()

Start rabbitmq docker

docker run -d -h $(hostname) --name rabbitmq -p 15679:15672 rabbitmq:3.7-management

Create test_queue in management rabbitmq UI

Start consumer in docker

docker run -ti --rm -v "$(pwd)/dev:/test" --link rabbitmq python:3.6 /bin/bash
pip install aio-pika
python /test/rabbit_test.py consumer -q test_queue

Start publisher in docker

docker run -ti --rm -v "$(pwd)/dev:/test" --link rabbitmq python:3.6 /bin/bash
pip install aio-pika
python /test/rabbit_test.py publisher -q test_queue

Restart rabbitmq container

docker restart rabbitmq

As result exited consumer and publisher processes

Consumer log

[2019.06.20 08:07:51.916] [13] [I] [__main__]:  Message - b'message: 5234'
[2019.06.20 08:07:51.920] [13] [I] [__main__]:  Message - b'message: 5235'
[2019.06.20 08:07:51.922] [13] [I] [__main__]:  Message - b'message: 5236'
[2019.06.20 08:07:51.926] [13] [I] [__main__]:  Message - b'message: 5237'
[2019.06.20 08:07:51.928] [13] [D] [aiormq.connection]: Reader task exited because:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
    yield from self._wait_for_data('readexactly')
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
    yield from self._waiter
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/tools.py", line 67, in __await__
    return (yield from self().__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 125, in drain
    return await self.writer.drain()
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 329, in drain
    raise exc
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.930] [13] [D] [aio_pika.connection]:   Closing AMQP connection <Connection: "amqp://rabbitmq">
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/test/rabbit_test.py", line 131, in <module>
    main()
  File "/test/rabbit_test.py", line 125, in main
    run_task.result()
  File "/test/rabbit_test.py", line 88, in run
    await consumer_task(conf)
  File "/test/rabbit_test.py", line 80, in consumer_task
    logger.info('Message - %r', message.body)
  File "/usr/local/lib/python3.6/site-packages/aio_pika/queue.py", line 400, in __aexit__
    await self.close()
  File "/usr/local/lib/python3.6/site-packages/aio_pika/tools.py", line 59, in awaiter
    return await future
  File "/usr/local/lib/python3.6/site-packages/aio_pika/queue.py", line 357, in close
    await self._amqp_queue.cancel(self._consumer_tag)
  File "/usr/local/lib/python3.6/site-packages/aio_pika/robust_queue.py", line 110, in cancel
    result = await super().cancel(consumer_tag, timeout, nowait)
  File "/usr/local/lib/python3.6/site-packages/aio_pika/queue.py", line 232, in cancel
    timeout=timeout, loop=self.loop
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 339, in wait_for
    return (yield from fut)
  File "/usr/local/lib/python3.6/site-packages/aiormq/channel.py", line 308, in basic_cancel
    nowait=nowait,
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 171, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/test/rabbit_test.py", line 77, in consumer_task
    logger.info('Message - %r', message.body)
  File "/usr/local/lib/python3.6/site-packages/aio_pika/message.py", line 630, in __aexit__
    await self.message.ack()
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
    yield from self._wait_for_data('readexactly')
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
    yield from self._waiter
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/tools.py", line 67, in __await__
    return (yield from self().__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 125, in drain
    return await self.writer.drain()
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 329, in drain
    raise exc
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.981] [13] [E] [asyncio]:   Task was destroyed but it is pending!
task: <Task pending coro=<reject_all() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/base.py:65> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x7fafdd184ac8>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:688]>
root@b34f3c105852:/#

Publisher log

[2019.06.20 08:07:51.925] [12] [I] [__main__]:  Send message: 'message: 5238'
[2019.06.20 08:07:51.926] [12] [D] [aio_pika.exchange]: Publishing message with routing key 'test_queue' via exchange <Exchange(): auto_delete=None, durable=None, arguments={})>: Message:{'app_id': None,
 'body_size': 13,
 'content_encoding': None,
 'content_type': None,
 'correlation_id': None,
 'delivery_mode': 1,
 'expiration': None,
 'headers': {},
 'message_id': None,
 'priority': 0,
 'reply_to': None,
 'timestamp': None,
 'type': 'None',
 'user_id': None}
[2019.06.20 08:07:51.929] [12] [D] [aiormq.connection]: Reader task exited because:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
    yield from self._wait_for_data('readexactly')
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
    yield from self._waiter
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.932] [12] [D] [aio_pika.connection]:   Closing AMQP connection <Connection: "amqp://rabbitmq">
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/test/rabbit_test.py", line 131, in <module>
    main()
  File "/test/rabbit_test.py", line 125, in main
    run_task.result()
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/test/rabbit_test.py", line 86, in run
    await publisher_task(conf)
  File "/test/rabbit_test.py", line 54, in publisher_task
    routing_key=queue_name,
  File "/usr/local/lib/python3.6/site-packages/aio_pika/exchange.py", line 202, in publish
    loop=self.loop, timeout=timeout
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 339, in wait_for
    return (yield from fut)
  File "/usr/local/lib/python3.6/site-packages/aiormq/channel.py", line 438, in basic_publish
    return await confirmation
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
    yield from self._wait_for_data('readexactly')
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
    yield from self._waiter
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.968] [12] [E] [asyncio]:   Task was destroyed but it is pending!
task: <Task pending coro=<Connection.__reader() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/base.py:168> wait_for=<Task finished coro=<__reader() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/connection.py:346> result=None> cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.6/site-packages/aiormq/base.py:51, <TaskWakeupMethWrapper object at 0x7f197a45eb88>()]>
[2019.06.20 08:07:51.968] [12] [E] [asyncio]:   Task was destroyed but it is pending!
task: <Task pending coro=<reject_all() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/base.py:65> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x7f197a45ef48>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:688]>
root@1f4b2385667f:/#
decaz commented 5 years ago

Duplicate of #202.

DriverX commented 5 years ago

I update issue with huge reproduce steps.

DriverX commented 5 years ago

Duplicate of #202.

This is not duplicate

Artimi commented 5 years ago

I tried the reproduction code with aiormq==2.7.2 recommended https://github.com/mosquito/aio-pika/issues/112#issuecomment-519597738 here. And it now fails with another exception. Consumer:

Traceback (most recent call last):
  File "/home/artimi/.virtualenvs/rabbit-messaging/lib/python3.6/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "example/messaging_consumer.py", line 50, in <module>
    asyncio.get_event_loop().run_until_complete(main())
  File "/usr/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "example/messaging_consumer.py", line 39, in main
    async for message in consumer:
  File "/home/artimi/wood/rabbit-messaging/rabbit_messaging/consumer.py", line 193, in __anext__
    return await self.get()
  File "/home/artimi/wood/rabbit-messaging/rabbit_messaging/consumer.py", line 128, in get
    message = await self._get()
  File "/home/artimi/wood/rabbit-messaging/rabbit_messaging/consumer.py", line 118, in _get
    message = await asyncio.shield(self._get_future)
  File "/home/artimi/.virtualenvs/rabbit-messaging/lib/python3.6/site-packages/aiormq/channel.py", line 298, in basic_get
    await self.rpc(spec.Basic.Get(queue=queue, no_ack=no_ack))
  File "/home/artimi/.virtualenvs/rabbit-messaging/lib/python3.6/site-packages/aiormq/base.py", line 171, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/artimi/.virtualenvs/rabbit-messaging/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/usr/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/artimi/.virtualenvs/rabbit-messaging/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/usr/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/artimi/.virtualenvs/rabbit-messaging/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
aiormq.exceptions.ConnectionClosed: (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")

Producer:

Traceback (most recent call last):
  File "/home/sebekpet/.virtualenvs/rabbit-messaging/lib/python3.6/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "example/messaging_producer.py", line 48, in <module>
    asyncio.get_event_loop().run_until_complete(main())
  File "/usr/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "/usr/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/sebekpet/.virtualenvs/rabbit-messaging/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "example/messaging_producer.py", line 35, in main
    await producer.publish('common.TestSchema.abc', data)
  File "/home/sebekpet/wood/rabbit-messaging/rabbit_messaging/producer.py", line 117, in publish
    await publish_coroutine
  File "/home/sebekpet/.virtualenvs/rabbit-messaging/lib/python3.6/site-packages/aio_pika/exchange.py", line 202, in publish
    loop=self.loop, timeout=timeout
  File "/usr/lib/python3.6/asyncio/tasks.py", line 339, in wait_for
    return (yield from fut)
  File "/home/sebekpet/.virtualenvs/rabbit-messaging/lib/python3.6/site-packages/aiormq/channel.py", line 439, in basic_publish
    return await confirmation
aiormq.exceptions.ConnectionClosed: (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")

I see that you added help wanted tag to this @mosquito. How could I help with this?

Artimi commented 4 years ago

In version aio-pika==6.1.3 I am able to achieve robust connection by catching connection exception raised from publish and using long-lived consumers (called with consume not with get). I tested it with RabbitMQ which is connected at the start and gets restarted during producing/consuming. This is how my wrapped publish method looks now:

async def _publish(
        self,
        schema: str,
        message: aio_pika.Message,
        routing_key: str
    ) -> Optional[aiormq.types.ConfirmationFrameType]:
        '''
        Publish coroutine in a special function so we can catch and ignore connection exceptions
        '''
        iterator = itertools.count() if self._retry_count is None else range(self._retry_count)
        for attempt in iterator:
            try:
                return await self._exchanges[schema].publish(message, routing_key)
            except aio_pika.exceptions.CONNECTION_EXCEPTIONS as exc:
                if attempt + 1 == self._retry_count:
                    self._logger.error(
                        'Message with schema = {}, routing key = {}, data = {} could not be send after = {} retries. Skipping.',
                        schema,
                        routing_key,
                        message.body,
                        self._retry_count
                    )
                    raise
                else:
                    self._logger.warning('Connection closed while publishing with exception = {}. Will retry.', exc)
                    await asyncio.sleep(self.RETRY_WAIT)
        return None
mosquito commented 4 years ago

So. Today I try to reproduce it. So, the consumer successfully connected after the rabbitmq container restart. Is it still actual?

BTW: running the rabbitmq in docker image it is a bad idea. Docker container might changing internal IP address, please use port forwarding or TCP proxy for representing this behaviour.

monomonedula commented 3 years ago

@mosquito yes it is

So. Today I try to reproduce it. So, the consumer successfully connected after the rabbitmq container restart. Is it still actual?

BTW: running the rabbitmq in docker image it is a bad idea. Docker container might changing internal IP address, please use port forwarding or TCP proxy for representing this behaviour.

mosquito commented 2 years ago

In aio-pika>=7 reconnect completely fixed (RobustConnection) as I can see.

gaby commented 6 months ago

This is still happening in v9.4.0 @mosquito

Can be reproduce with https://aio-pika.readthedocs.io/en/latest/patterns.html

mosquito commented 6 months ago

@gaby what exactly happening?

gaby commented 6 months ago

@gaby what exactly happening?

With using connect_robust() if the connection to RabbitMQis lost (not closed) the logs show the consumer trying to reconnect every 5 secs. Once RabbitMQ becomes reachable again, the logs stop and no new messages show up (Which I would assume means it reconnected). Instead the consumer no longer consumes from RabbitMQ. In the Mgmt UI, RabbitMQhas 0 consumer for that queue.

The log message I see during reconnect is this line (it stops when RabbitMQ comes back online, but consumer no longer gets messages): https://github.com/mosquito/aio-pika/blob/master/aio_pika/robust_connection.py#L83

It's the same behavior posted in https://github.com/mosquito/aio-pika/issues/231 I'm using the aio-pika 9.4.0 with Python 3.10

mosquito commented 6 months ago

@gaby How can I reproduce the “connection lost” part?

Darsstar commented 6 months ago

Please try 9.4.1 (which includes #622) which removed a deadlock. The zero consumer for the queue is in line with you running into that deadlock.

gaby commented 6 months ago

@mosquito @Darsstar Issue still happens in v9.4.1

I can post a full example in 1-2 hrs, but basically just restarting the RabbitMQ container causes this. It's a connection lost event since it's not gracefully shutdown.

gaby commented 6 months ago

@mosquito @Darsstar

worker.py

import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import Master, NackMessage, RejectMessage

async def worker(*, task_id: int) -> None:
    if task_id % 2 == 0:
        raise RejectMessage(requeue=False)
    if task_id % 2 == 1:
        raise NackMessage(requeue=False)
    print(task_id)

async def main() -> None:
    connection = await connect_robust(
        "amqp://guest:guest@127.0.0.1/?name=aio-pika%20worker",
    )

    # Creating channel
    channel = await connection.channel()

    # Initializing Master with channel
    master = Master(channel)
    await master.create_worker("my_task_name", worker, auto_delete=False, durable=True)

    try:
        await asyncio.Future()
    finally:
        await connection.close()

if __name__ == "__main__":
    asyncio.run(main())

RabbitMQ:

mkdir data
docker run -d --hostname rabbit --name rabbit -p 5672:5672 -p 15672:15672 -v $PWD/data:/var/lib/rabbitmq rabbitmq:3-management

Login into the UI, and click on Queues and Streams -> my_task_name, you will see 1 consumer. Now run this:

docker kill rabbit; docker start rabbit; docker logs -f rabbit

You will see the logs in the worker stop printing this:

Connection attempt to "amqp://guest:******@127.0.0.1/?name=aio-pika%20worker" failed: Server connection reset: ConnectionResetError(104, 'Connection reset by peer'). Reconnecting after 5 seconds.

You will also see the following in the RabbitMQ logs:

2024-03-27 01:49:40.521168+00:00 [info] <0.693.0> accepting AMQP connection <0.693.0> (172.17.0.1:42788 -> 172.17.0.2:5672)
2024-03-27 01:49:40.522729+00:00 [info] <0.693.0> connection <0.693.0> (172.17.0.1:42788 -> 172.17.0.2:5672) has a client-provided name: aio-pika worker
2024-03-27 01:49:40.523354+00:00 [info] <0.693.0> connection <0.693.0> (172.17.0.1:42788 -> 172.17.0.2:5672 - aio-pika worker): user 'guest' authenticated and granted access to vhost '/'

But if you know click on Queues and Streams -> my_task_name, there's 0 consumers.

Note, i'm using auto_delete=False, durable=True, so the Queue gets recreated again else I don't get any queue at all since the worker is not creating that queue again either.

image

Edit:

Here are the logs when running in Debug Mode and I trigger "docker kill/start/logs":

DEBUG:aiormq.connection:Reader exited for <Connection: "amqp://guest:******@127.0.0.1:5672/?name=aio-pika%20worker" at 0x704c0d04fce0>
DEBUG:aiormq.connection:Cancelling cause reader exited abnormally
DEBUG:aiormq.connection:Sending <Connection.Close object at 0x704c0d086e80> to <Connection: "amqp://guest:******@127.0.0.1:5672/?name=aio-pika%20worker" at 0x704c0d04fce0>
DEBUG:aiormq.connection:Writer on connection amqp://guest:******@127.0.0.1:5672/?name=aio-pika%20worker closed
DEBUG:aiormq.connection:Writer exited for <Connection: "amqp://guest:******@127.0.0.1:5672/?name=aio-pika%20worker" at 0x704c0d04fce0>
DEBUG:aiormq.connection:Closing connection <Connection: "amqp://guest:******@127.0.0.1:5672/?name=aio-pika%20worker" at 0x704c0d04fce0> cause: <AMQPConnectionError: ('Server connection unexpectedly closed. Read 0 bytes but 1 bytes expected',)>
DEBUG:aio_pika.channel:Start reopening channel <aio_pika.robust_channel.RobustChannel object at 0x704c0d075ea0>
INFO:aio_pika.robust_connection:Connection to amqp://guest:******@127.0.0.1/?name=aio-pika%20worker closed. Reconnecting after 5 seconds.
DEBUG:aio_pika.robust_connection:Connection attempt for <RobustConnection: "amqp://guest:******@127.0.0.1/?name=aio-pika%20worker" 1 channels>
DEBUG:aiormq.connection:Connecting to: amqp://guest:******@127.0.0.1:5672/?name=aio-pika%20worker
INFO:aio_pika.robust_connection:Connection to amqp://guest:******@127.0.0.1/?name=aio-pika%20worker closed. Reconnecting after 5 seconds.
WARNING:aio_pika.robust_connection:Connection attempt to "amqp://guest:******@127.0.0.1/?name=aio-pika%20worker" failed: Server connection reset: ConnectionResetError(104, 'Connection reset by peer'). Reconnecting after 5 seconds.
DEBUG:aio_pika.robust_connection:Waiting for connection close event for <RobustConnection: "amqp://guest:******@127.0.0.1/?name=aio-pika%20worker" 1 channels>
DEBUG:aio_pika.robust_connection:Connection attempt for <RobustConnection: "amqp://guest:******@127.0.0.1/?name=aio-pika%20worker" 1 channels>
DEBUG:aiormq.connection:Connecting to: amqp://guest:******@127.0.0.1:5672/?name=aio-pika%20worker
INFO:aio_pika.robust_connection:Connection to amqp://guest:******@127.0.0.1/?name=aio-pika%20worker closed. Reconnecting after 5 seconds.
WARNING:aio_pika.robust_connection:Connection attempt to "amqp://guest:******@127.0.0.1/?name=aio-pika%20worker" failed: Server connection reset: ConnectionResetError(104, 'Connection reset by peer'). Reconnecting after 5 seconds.
DEBUG:aio_pika.robust_connection:Waiting for connection close event for <RobustConnection: "amqp://guest:******@127.0.0.1/?name=aio-pika%20worker" 1 channels>
DEBUG:aio_pika.robust_connection:Connection attempt for <RobustConnection: "amqp://guest:******@127.0.0.1/?name=aio-pika%20worker" 1 channels>
DEBUG:aiormq.connection:Connecting to: amqp://guest:******@127.0.0.1:5672/?name=aio-pika%20worker
DEBUG:aio_pika.channel:Start reopening channel <aio_pika.robust_channel.RobustChannel object at 0x704c0d075ea0>
DEBUG:aiormq.connection:Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\x06\x00\x14\x00\n\x010\xce', should_close=False, drain_future=None)
DEBUG:aiormq.connection:Received frame <Channel.OpenOk object at 0x704c0d0ac400> in channel #1 weight=16 on <Connection: "amqp://guest:******@127.0.0.1:5672/?name=aio-pika%20worker" at 0x704c0d087600>
DEBUG:aiormq.connection:Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\x05\x00U\x00\n\x00\xce', should_close=False, drain_future=None)
DEBUG:aiormq.connection:Received frame <Confirm.SelectOk object at 0x704c0d0bc790> in channel #1 weight=12 on <Connection: "amqp://guest:******@127.0.0.1:5672/?name=aio-pika%20worker" at 0x704c0d087600>
DEBUG:aiormq.connection:Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\x0b\x00<\x00\n\x00\x00\x00\x00\x00\x00\x00\xce', should_close=False, drain_future=None)
DEBUG:aiormq.connection:Received frame <Basic.QosOk object at 0x704c0d0bc7f0> in channel #1 weight=12 on <Connection: "amqp://guest:******@127.0.0.1:5672/?name=aio-pika%20worker" at 0x704c0d087600>
DEBUG:aio_pika.robust_connection:Connection made on <RobustConnection: "amqp://guest:******@127.0.0.1/?name=aio-pika%20worker" 1 channels>
DEBUG:aio_pika.robust_connection:Waiting for connection close event for <RobustConnection: "amqp://guest:******@127.0.0.1/?name=aio-pika%20worker" 1 channels>

There's no logs after that. Seems the connection and channel both get re-established, but the aio_pika.queue:Declare Queue and aio_pika.queue: Start to consuming queue never happen.

gaby commented 6 months ago

@mosquito @Darsstar What I'm getting from this issues is the connection and channel are both re-created and established. The queue is not redeclared and the consumer is not re-created.

Is there a way in aio-pika to add a callback to connect_robust() to do those actions until a fix is in place?

Darsstar commented 6 months ago

There is a slight posibility that what comes next is only true in combination with #615, since I repoduced your issue on that branch.

You should store the result of master.create_worker() since WeakSets are involed. The Worker instance has a strong reference to the RobustQueue which will keep the WeakRef the RobustConnection holds alive.

With the changes in this snippet it started working.

    client = await master.create_worker("my_task_name", worker, auto_delete=False, durable=True)

    try:
        await asyncio.Future()
    finally:
        await client.close()
        await connection.close()

edit: see https://github.com/mosquito/aio-pika/issues/594

gaby commented 6 months ago

@Darsstar That fixed the issue! It now redeclares the queue and starts consuming again.

The documentation needs to be updated if the WeakRef are going to be involved.