mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Queue.cansel() does not work in the consume mode #253

Open Kaveshnikov opened 4 years ago

Kaveshnikov commented 4 years ago

Hi. I tried aio-pika with aiohttp and noticed, that queue.cancel() hanged infinitely. Receiving and sending messages work normally. channel.close() in the consume mode hangs too. Mentioned methods works normally in the client mode.

aio-pika==6.1.2 aiormq==2.7.5 pamqp==2.3.0 aiohttp==3.6.0 RabbitMQ 3.7.18

Code example:

async def on_message(message: aio_pika.IncomingMessage):
    async with message.process():
        print(message.body.decode())

async def consume(app: web.Application):
    connection: aio_pika.RobustConnection = app['mq']
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)
    queue: aio_pika.RobustQueue = await channel.declare_queue('hello', durable=True)
    consumer_tag = await queue.consume(on_message)

    yield
    # it hangs here
    await queue.cancel(consumer_tag, nowait=True)
    await channel.close()

async def init_mq(app: web.Application) -> None:
    connection: aio_pika.RobustConnection = await aio_pika.connect(
        host=HOST,
        port=PORT,
        login=LOGIN,
        password=PASSWORD,
        timeout=CONNECTION_TIMEOUT,
    )
    app['mq'] = connection

    yield
    await app['mq'].close()

def create_app() -> web.Application:
    app: web.Application = web.Application()
    app.cleanup_ctx.append(init_mq)
    app.cleanup_ctx.append(consume)

    return app

if __name__ == '__main__':
    app: web.Application = create_app()
    web.run_app(app, host='localhost', port=8080)
Kaveshnikov commented 4 years ago

@mosquito

mosquito commented 4 years ago

Could you provide RabbitMQ traffic dump?

Hint command: tcpdump -w traff.pcap -pni any "port 5672"

Kaveshnikov commented 4 years ago

This is a dump from the application from start to close

ghost commented 4 years ago

Hey there!

Although my code is slightly different, I believe its the same issue:

import asyncio

import aio_pika
from aiohttp import web

async def listen_to_rmq(app):
    fixture_id = "1"
    conn = await aio_pika.connect_robust("amqp://guest:guest@localhost/")

    ch = await conn.channel()

    ex = await ch.declare_exchange(fixture_id)
    q = await ch.declare_queue(name=fixture_id, exclusive=True)
    await q.bind(ex)

    try:
        async with q.iterator() as q_iter:
            async for msg in q_iter:
                async with msg.process():
                    print(msg.body.decode())
    except asyncio.CancelledError:
        pass
    finally:
        await q.unbind(ex, fixture_id)
        await q.delete()
        await ex.delete()
        await ch.close()

async def start_background_tasks(app):
    app['rmq_listener'] = asyncio.create_task(listen_to_rmq(app))

async def cleanup_background_tasks(app):
    app['rmq_listener'].cancel()
    await app['rmq_listener']

if __name__ == "__main__":
    app = web.Application()
    app.on_startup.append(start_background_tasks)
    app.on_cleanup.append(cleanup_background_tasks)
    web.run_app(app)
moznuy commented 4 years ago

I think I found problem. I have encountered similar issue in the past. It's related to the way how aiohttp handle stopping(and asyncio.run() because aiohttp.run_app is analogous). aiohttp web.run_app:

    except (GracefulExit, KeyboardInterrupt):  # pragma: no cover
        pass
    finally:
        _cancel_all_tasks(loop)
        if sys.version_info >= (3, 6):  # don't use PY_36 to pass mypy
            loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

The problem is: Reader task is cancelled before queue.cancel() rpc response received. You can see that if you add

logging.basicConfig(level=logging.DEBUG)

in above examples:

 9180.18 -    aiormq.connection -    DEBUG - Reader task cancelled:
Traceback (most recent call last):
  File "/home/lamar/.envs/testPika/lib/python3.8/site-packages/aiormq/connection.py", line 380, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/home/lamar/.envs/testPika/lib/python3.8/site-packages/aiormq/connection.py", line 332, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/lib/python3.8/asyncio/streams.py", line 723, in readexactly
    await self._wait_for_data('readexactly')
  File "/usr/lib/python3.8/asyncio/streams.py", line 517, in _wait_for_data
    await self._waiter
asyncio.exceptions.CancelledError

So await queue.cancel() waits forever. My understanding of _cancel_all_tasks(loop) is limited but as far as I can tell:

  1. All tasks are marked canceled.
  2. aiohttp _run_app task is scheduled after aiormq.connection.__reader
  3. Reader task runs and returns.
  4. _run_app tasks waits for our shutdown tasks - queue.cancel() but it stucks forever, because now nobody can resolve the future.

I think this issue is related to aio-libs/aiohttp#3593

moznuy commented 4 years ago

I'm not very familiar with either aiopika or aiormq inner workings but after taking a peek in [aiormq.connection.Connection.__reader()_](https://github.com/mosquito/aiormq/blob/63a8b0d6d50eb93f9ee8c851fcfda01e3d5f122b/aiormq/connection.py#L374) maybe something like this could work:

async def __reader(all_rpc_futures_are: asyncio.Future):
    cancellation_in_progress = False

    try:
        while True: # not self.reader.at_eof():
            if cancellation_in_progress and all_rpc_futures_are.done():
                break
            try:
                await asyncio.sleep(0.1) # Work
            except asyncio.CancelledError:
                print('Socket Received Cancellation')
                cancellation_in_progress = True
    finally:
        print('Socket close')
# Somewhere in connection.close() workflow
await asyncio.gather(<all rpc futures>)
all_rpc_futures_are.set_result(None)
# Or
gathering_future = asyncio.gather(<all rpc futures>)
gathering_future.add_done_callback(lambda: all_rpc_futures_are.set_result(None))

Some points that might be important:

  1. I don't know if __reader() could be cancelled from somewhere else beside Connection.close() workflow or aiohttp(and asyncio.run()) _cancel_all_tasks().
  2. This solution should not be endless after cancellation_in_progress == True. Maybe one can add timeout to Connection.close(timeout = None) and some logic to set_exception to all_rpc_futures_are
multun commented 4 years ago

@moznuy I think #3805 fixed the issue

moznuy commented 4 years ago

@multun Yes, I am quite happy that aiohttp now waits for cleanup tasks before cancelling all tasks. But some slight changes to the example above:

import asyncio
import logging

import aio_pika

logging.basicConfig(format='%(relativeCreated)8.2f - %(name)20s - %(levelname)8s - %(message)s', level=logging.DEBUG)

async def on_message(msg):
    async with msg.process():
        logging.info("Received: %s", msg.body.decode())
        await asyncio.sleep(0.1)

async def main():
    fixture_id = "test_queue"
    conn: aio_pika.Connection = await aio_pika.connect("amqp://guest:guest@localhost/")

    ch: aio_pika.Channel = await conn.channel()
    await ch.set_qos(prefetch_count=3)

    q = await ch.declare_queue(name=fixture_id, durable=True)
    tag = await q.consume(callback=on_message)
    try:
        await asyncio.sleep(1000)
    finally:
        logging.debug("Before cancel")
        await q.cancel(tag)
        logging.debug("After cancel")
        await conn.close()

if __name__ == "__main__":
    asyncio.run(main())

And you still get hanging on await q.cancel() even without aiohttp:

   56.75 -              asyncio -    DEBUG - Using selector: EpollSelector
   62.51 -  aio_pika.connection -    DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost/">
   62.68 -  aio_pika.connection -    DEBUG - Channel created: <Channel "None#Not initialized channel">
   64.48 -       aio_pika.queue -    DEBUG - Declaring queue: <Queue(test_queue): auto_delete=False, durable=True, exclusive=False, arguments=None>
   65.18 -       aio_pika.queue -    DEBUG - Start to consuming queue: <Queue(test_queue): auto_delete=False, durable=True, exclusive=False, arguments=None>
   66.43 -                 root -     INFO - Received: Hello 65308
   66.49 -                 root -     INFO - Received: Hello 65309
   66.53 -                 root -     INFO - Received: Hello 65310
  170.34 -                 root -     INFO - Received: Hello 65311
  171.25 -                 root -     INFO - Received: Hello 65312
  171.46 -                 root -     INFO - Received: Hello 65313
  273.44 -                 root -     INFO - Received: Hello 65314
  275.26 -                 root -     INFO - Received: Hello 65315
  275.51 -                 root -     INFO - Received: Hello 65316
  377.35 -                 root -     INFO - Received: Hello 65317
  379.21 -                 root -     INFO - Received: Hello 65318
  379.81 -                 root -     INFO - Received: Hello 65319
  481.31 -                 root -     INFO - Received: Hello 65320
  483.42 -                 root -     INFO - Received: Hello 65321
  484.16 -                 root -     INFO - Received: Hello 65322
  583.59 -                 root -     INFO - Received: Hello 65323
  584.77 -                 root -     INFO - Received: Hello 65324
  585.51 -                 root -     INFO - Received: Hello 65325
  686.03 -                 root -     INFO - Received: Hello 65326
  686.11 -                 root -     INFO - Received: Hello 65327
  686.56 -                 root -     INFO - Received: Hello 65328
  722.92 -    aiormq.connection -    DEBUG - Reader task cancelled:
Traceback (most recent call last):
  File "/usr/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 599, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 567, in run_forever
    self._run_once()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1819, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib/python3.8/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/lamar/.envs/api_server3/lib/python3.8/site-packages/aiormq/connection.py", line 377, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/home/lamar/.envs/api_server3/lib/python3.8/site-packages/aiormq/connection.py", line 329, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/lib/python3.8/asyncio/streams.py", line 723, in readexactly
    await self._wait_for_data('readexactly')
  File "/usr/lib/python3.8/asyncio/streams.py", line 517, in _wait_for_data
    await self._waiter
asyncio.exceptions.CancelledError
  723.99 -                 root -    DEBUG - Before cancel

...And nothing....

Because asyncio.run works similar to aiohttp run_app before #3805 fix.

One small note: I've tried a bunch of examples before stopping on this one because I wasn't getting consistent results: sometimes it was 50/50, sometimes program used to stop, sometimes adding / subtracting one line: logging.debug("Before cancel") used to change the behaviour completely upside down from 0 to 100%