mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.23k stars 187 forks source link

How to exit QueryIterator? #358

Open alex-oleshkevich opened 3 years ago

alex-oleshkevich commented 3 years ago

I know it is not the first time it is posted here but still, I could not find a working solution. My problem is that my application runs in CLI mode and doesn't react on ^C property. Here is an example:

import asyncio

import aio_pika

async def main():
    async def get_connection():
        return await aio_pika.connect_robust('amqp://guest:guest@localhost')

    pool = aio_pika.pool.Pool(get_connection)
    async with pool.acquire() as connection:
        async with connection:
            channel = await connection.channel()
            queue = await channel.declare_queue('my_queue_name', durable=True)
            async with queue.iterator() as stream:
                async for message in stream:
                    print(message)

if __name__ == '__main__':
    asyncio.run(main())

At this moment everything works well. Messages are received and printed. Then, when I send CTRL+C (SIGINT) to the process, my program does not close. The messages are no longer received but the program runs.

I have to press ^C again and only then the process gets killed with this exception trace:

Click to see the trace ``` ^C^CTraceback (most recent call last): File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/runners.py", line 43, in run return loop.run_until_complete(main) File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 566, in run_until_complete self.run_forever() File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 534, in run_forever self._run_once() File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 1735, in _run_once event_list = self._selector.select(timeout) File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/selectors.py", line 468, in select fd_event_list = self._selector.poll(timeout, max_ev) KeyboardInterrupt During handling of the above exception, another exception occurred: Traceback (most recent call last): File "amqp_test.py", line 24, in asyncio.run(main()) File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/runners.py", line 46, in run _cancel_all_tasks(loop) File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/runners.py", line 62, in _cancel_all_tasks tasks.gather(*to_cancel, loop=loop, return_exceptions=True)) File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 566, in run_until_complete self.run_forever() File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 534, in run_forever self._run_once() File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 1735, in _run_once event_list = self._selector.select(timeout) File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/selectors.py", line 468, in select fd_event_list = self._selector.poll(timeout, max_ev) KeyboardInterrupt Exception ignored in: Traceback (most recent call last): File "amqp_test.py", line 18, in main File "/home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aio_pika/connection.py", line 212, in __aexit__ File "/home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aio_pika/robust_connection.py", line 190, in close File "/home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aio_pika/connection.py", line 33, in close File "/home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aiormq/base.py", line 149, in close File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 398, in create_task File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 475, in _check_closed RuntimeError: Event loop is closed sys:1: RuntimeWarning: coroutine 'Base.__closer' was never awaited sys:1: RuntimeWarning: coroutine 'QueueIterator.close' was never awaited Task was destroyed but it is pending! task: wait_for= wait_for=()]> cb=[FutureStore.__on_task_done..remover() at /home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aiormq/base.py:51, <1 more>, ()]> cb=[shield.._inner_done_callback() at /home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/tasks.py:803]> Task was destroyed but it is pending! task: wait_for=()]> cb=[FutureStore.__on_task_done..remover() at /home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aiormq/base.py:51, <1 more>, ()]> Exception ignored in: Traceback (most recent call last): File "/home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aiormq/channel.py", line 137, in rpc File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/tasks.py", line 414, in wait_for File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/queues.py", line 161, in get File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 683, in call_soon File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 475, in _check_closed RuntimeError: Event loop is closed ```

Expected behavior:

The program should exit on the first ^C.

astrojuanlu commented 3 years ago

I came here to report this same issue.

I have tried to add signal handlers with a shutdown coroutine like advised here:

https://www.roguelynn.com/words/asyncio-graceful-shutdowns/

However, it still doesn't help, because apparently cancelling the ongoing tasks triggers some more tasks, and the process still hangs. Like @alex-oleshkevich, I also need to press Ctrl+C again.

It would be nice to document how to properly do this with minimal code, for example starting from https://github.com/mosquito/aio-pika/blob/master/docs/source/examples/simple_consumer.py.

xref: https://github.com/mosquito/aio-pika/issues/160

Shutting down in asyncio is really not easy.

Indeed...

You mentioned https://aiomisc.readthedocs.io/en/latest/entrypoint.html there, but it's still not clear how that helps handling graceful shutdown.

astrojuanlu commented 3 years ago

Found this promising article by @rob-blackbourn https://rob-blackbourn.medium.com/a-python-asyncio-cancellation-pattern-a808db861b84

astrojuanlu commented 3 years ago

I still don't know how to properly exit QueryIterator, but at least I fixed the "double Ctrl+C" issue by using loop.run_until_complete instead of asyncio.run... even without passing the loop to connect or connect_robust. And the reason is the cancellation of tasks:

https://github.com/python/cpython/blob/v3.8.5/Lib/asyncio/runners.py#L46

There is one aio-pika task that is getting stuck and does not react well to cancellation. After creating a custom signal handler that cancel()s the remaining tasks one by one on SIGINT, I suspect the problem lies in aiormq.Connection.__heartbeat_task.

mosquito commented 3 years ago

The aiormq's Base class contains a task container and when .close() method it's guarantee cancelation all related tasks and all tasks on child instances. e.g. Connection creates Channel object, and when connection instance will be closed all related channels will be closed too.

@astrojuanlu why you blame aiormq.Connection.__heartbeat_task?

astrojuanlu commented 3 years ago

@mosquito I suspect from it because in my trial-and-error experiments, it's the only task I couldn't cancel. Calling .cancel() on it and then await would hang the program. There might be better ways to debug which task is blocked, but I don't know them.

On the other hand, inspired by the article I shared above, I wrote a client using plain aiormq and leveraging a cancellation asyncio.Event: https://gist.github.com/astrojuanlu/17c2f05298d3549c55ac45b1c85cffb6 which leaves me wondering what would be the advantage of aio-pika...

astrojuanlu commented 3 years ago

(Clarification: I wrote a new client but did not fully fix the issue, because writing loop.close() at the end displays all the unfinished tasks, and I am not being able to properly cancel them)

mosquito commented 3 years ago

aio-pika is a high level wrapper around aiormq and contain helpers for autoreconnects and some useful tricks. You can use aiormq as well but have to handle reconnects etc.

HMaker commented 3 years ago

There is no way to have a clean exit on that cases?

Bennyelg commented 3 years ago

+1

alex-oleshkevich commented 3 years ago

I can confirm that the issue does not exists when you run using loop.run_forever(). But it always happens when using asyncio.run.

Bennyelg commented 3 years ago

The only thing is working for me now. poor solution but untill then.

@contextmanager
def safe_shutdown():
    try:
        yield
    except RuntimeError as e:
        print("Im empty queue now, Leaving ---> C ya next time.")
        # "Event loop stopped before Future completed"
        pass

async def consume_events(...):
    ..... consuming.
    if await queue_provider.is_empty():
            # Extra code implemented to overcome this bug:
            # https://github.com/mosquito/aio-pika/issues/358
            loop.call_soon_threadsafe(loop.stop)

with safe_shutdown():
    loop.run_until_complete(consume_events(storage_provider))
georgejdanforth commented 3 years ago

I think I've got a working solution for this. The idea is to mostly follow the procedure outlined in this post shared by @astrojuanlu, but use a closure which iterates on the QueueIterator, and then close the QueueIterator in the outer scope when a shutdown signal is received.

Basically it looks like this:

async def run(loop, stop):
    # ...connection setup
    async with conn:
        # ...channel & queue setup
        async with queue.iterator() as queue_iter:
            # IMPORTANT SECTION
            async def consume():
                async for message in queue_iter:
                    async with message.process():
                        loop.create_task(handle(message))
            loop.create_task(consume())
            while not stop.is_set():
                await asyncio.sleep(LOOP_TIMEOUT)
            await queue_iter.close()

async def shutdown(sig, loop, stop):
    stop.set()
    # Allow time for the run loop to do another iteration and close the queue
    await asyncio.sleep(LOOP_TIMEOUT)

    current_task = asyncio.current_task()
    tasks = [task for task in asyncio.all_tasks() if task is not current_task]
    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)
    loop.stop()

if __name__ == '__main__':
    stop = asyncio.Event()
    loop = asyncio.get_event_loop()

    def shutdown_handler(sig):
        return loop.create_task(shutdown(sig, loop, stop))

    for sig in (SIGHUP, SIGINT, SIGTERM):
        loop.add_signal_handler(sig, shutdown_handler, sig)

    try:
        loop.create_task(run(loop, stop))
        loop.run_forever()
    finally:
        loop.close()
HMaker commented 3 years ago

This is the solution to avoid that many exceptions on clean exit, just supress CancelledError raised from async queue iterator

import asyncio
import aio_pika

async def main():
    async def get_connection():
        return await aio_pika.connect_robust('amqp://guest:guest@localhost')

    pool = aio_pika.pool.Pool(get_connection)
    async with pool.acquire() as connection:
        async with connection:
            channel = await connection.channel()
            queue = await channel.declare_queue('my_queue_name', durable=True)
            async with queue.iterator() as stream:
                try: # <- must be enclosing the iteration
                  async for message in stream:
                      print(message)
                except asyncio.CancelledError:
                    pass # clean exit

asyncio.run(main())

there will be still a single exception due broken socket reading at cancellation time, but it will perform a clean exit of queues, channels and connections.

HMaker commented 3 years ago

I think that's a short solution for what @georgejdanforth tried

ironhacker commented 3 years ago

For me the complexity of the solutions required to cleanly exit from QueueIterator nullify its benefits. I find a pattern utilizing an exit event and callback for the message much cleaner.


import signal
import asyncio
import functools
import aio_pika

async def on_message(msg):                                                                          
    print(msg)

async def main():

    kill_event = asyncio.Event()                                                                    

    async def shutdown():                                                            
        kill_event.set()                                                                            

    asyncio.get_running_loop().add_signal_handler(                                                  
        signal.SIGINT, functools.partial(asyncio.create_task, shutdown())                           
    )

    connection = aio_pika.connect_robust('amqp://guest:guest@localhost')
    async with connection:
        channel = await connection.channel()
        queue = await channel.declare_queue('my_queue')
        await queue.consume(on_message)
        await kill_event.wait()
    print('exiting') 

asyncio.run(main())
maxims94 commented 3 years ago

I think that aio-pika has not been designed for low-level programming (reacting to signals etc.). aiormq seems to be more suited for this task. Here's my solution:

import asyncio
import aiormq
import aiormq.abc
from signal import SIGINT, SIGTERM

async def on_message(message: aiormq.abc.DeliveredMessage):
  print(" [x] Received message %r" % (message,))
  print("     Message body is: %r" % (message.body,))

def handler():
  print("Received shutdown signal")
  loop.remove_signal_handler(SIGTERM)
  loop.add_signal_handler(SIGINT, lambda: None)
  loop.stop()

async def main():
  loop = asyncio.get_running_loop()
  for sig in (SIGINT,SIGTERM):
    loop.add_signal_handler(sig, handler)

  connection = await aiormq.connect("amqp://guest:guest@localhost/")
  channel = await connection.channel()
  await channel.basic_qos(prefetch_count=1)
  declare_ok = await channel.queue_declare('task_queue', durable=True)
  await channel.basic_consume(declare_ok.queue, on_message, no_ack=True)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# we enter a never-ending loop that waits for data and runs
# callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()

Works like a charm and as clean as it is going to get. I don't know about you, but I prefer robust code, even if that means using a lower level interface and writing more code.

wallyhall commented 2 years ago

I still don't know how to properly exit QueryIterator, but at least I fixed the "double Ctrl+C" issue by using loop.run_until_complete instead of asyncio.run... even without passing the loop to connect or connect_robust. And the reason is the cancellation of tasks:

https://github.com/python/cpython/blob/v3.8.5/Lib/asyncio/runners.py#L46

There is one aio-pika task that is getting stuck and does not react well to cancellation. After creating a custom signal handler that cancel()s the remaining tasks one by one on SIGINT, I suspect the problem lies in aiormq.Connection.__heartbeat_task.

Interestingly I've hit a similar problem. If I forcibly cancel ayncio.all_tasks() after (say) 5 seconds, I see this error reported by aio_pika:

Closing channel <Channel: "1"> because RPC call <pamqp.specification.Basic.Cancel object at 0x7f917a979070> cancelled

Shoving a bunch of breakpoints all over the place, it looks (and I could be very wrong) that it's the QueueIterator's __aexit__() which calls a cancel - with no timeout. (It hangs indefinitely waiting for RabbitMQ to acknowledge the shutdown - I guess?

class QueueIterator:
    @shield
    async def close(self):
        if not self._consumer_tag:
            return

        await self._amqp_queue.cancel(self._consumer_tag)

Am I barking up the wrong tree?

rob-blackbourn commented 2 years ago

I've been doing this: https://github.com/rob-blackbourn/example-aio-pkia-cancellation-1

Darsstar commented 2 years ago

QueryIterator.__anext__ will never throw a StopAsyncIteration, does that mean intended usage is to always provide a timeout to queue.iterator()?

wallyhall commented 1 year ago

QueryIterator#anext will never throw a StopAsyncIteration, does that mean intended usage is to always provide a timeout to queue.iterator()?

I think the above is the key. +1 to @Darsstar

I've investigated this further - and believe it may be related to https://github.com/mosquito/aio-pika/issues/379 .

If my understanding of the problem is correct - there are several scenarios where the QueueIterator does not (or may not) raise a StopAsyncIteration when the channel or connection dies, or if asyncio cancels all pending tasks (perhaps due to a KeyboardInterrupt from a SIGINT or SIGTERM).

We've now wrapped aio_pika in our own service layer, but hopefully the following snippet makes sense in a more broad context:


TL;DR: we set an asyncio.Event via a Channel._close_callbacks, which raises a StopAsyncIteration from the QueueIterator.__anext__. There's some logic to (try) and ensure RobustConnection can still reconnect without stopping iteration etc as well. As far as our testing various scenarios has gone - it works.

We have tested:


A helper function (which we've implemented as a generic Python class with an __call__ method to help satisfy type checking in Pylance) waits on both __anext__ of the iterator and an asyncio.Event - which we use as an interrupt to indicate that the QueueIterator should stop:

T = TypeVar('T')

class ClosableAsyncIteratorHelper(Generic[T]):
    """
    This is a callable class, which serves solely to provide a Generic helper function
    for calling an async iterator (e.g. when __anext__ is used, but the background
    connection may have been closed meanwhile, and you're otherwise left hanging
    forever).
    """
    async def __call__(
            self,
            stop_event: asyncio.Event,
            async_iterator: Callable[[], Coroutine[Any, Any, T]]
            ) -> T:
        # don't proceed if the stop event has already fired
        # (from a previous "successful" iteration which
        # occurred simultaneously with a `stop_event`)
        if stop_event.is_set():
            raise StopAsyncIteration()

        # if a stop event is detected, raise a StopAsyncIteration
        stop_event_wait_task = stop_event.wait()
        async_iterator_task = async_iterator()

        done, pending = await asyncio.wait(
            [ stop_event_wait_task, async_iterator_task ],
            return_when=asyncio.FIRST_COMPLETED
        )

        # cleanly end any (if any) outstanding tasks.
        # there shouldn't ever be more than 1 (but may
        # be none).
        for task in pending:
            task.cancel()

        # we may get both the `async_iterator` and `stop_event_wait`
        # tasks return together.
        #
        # in that case, we emit the remaining item and raise a
        # StopAsyncIteration the next time around (see above).
        #
        # this is relevant for aio_pika, which may have an in-flight
        # message which need processing - before we handle the request
        # to stop.
        for task in done:
            if task.get_coro() == async_iterator_task:
                return cast(T, task.result())

        # this line can only be reached if the `stop_event` has been set
        # without a simultaneous `async_iterator`
        raise StopAsyncIteration()

# for aio_pika, a channel or connection closure doesn't break message iterators ... rather they just
# hang around indefinitely.  This is used to ensure they explicitly stop iterating.
_iterate_closable_message_iterator = ClosableAsyncIteratorHelper[aio_pika.abc.AbstractIncomingMessage]()

We've tied that into the QueueIterator thusly:

class OurOwnQueueIterator(QueueIterator):
    def __init__(self, ...):    ## <-- this is NOT how we do it, and almost certainly won't work for you directly - but hopefully gives the gist of what we're doing.  We've heavily abstracted `aio-pika` for our use case.
        # provide a means for cleanly exiting QueueIterators when the channel closes.
        self._channel_closed = asyncio.Event()
        self._amqp_queue.close_callbacks.add(self._pika_channel_closed_callback)  # type: ignore
        ## CHECK THE ABOVE LINE WORKS - we've tested it against the aio-pika `Connection.close_callbacks`, and this code works.  YMMV doing it against a `Queue`.

    async def _pika_channel_closed_callback(self, channel: aio_pika.abc.AbstractChannel, exception: Exception):
        """
        This is intended to catch a number of specific scenarios, working around some difficulties with aio_pika.

         - If we're using an AbstractRobustChannel, everything will automatically attempt to reconnect _unless_ the
           channel or connection has been explictly close()'d, or a critical aio_pika asyncio task has been
           cancelled (as may be the case of an unhandled SIGTERM has occurred).

         - If we're using an AbstractChannel, the channel or connection will have gone away permanently.

        Under specific conditions, we wish to exit any QueueIterators to hand execution back to the caller
        (as the "iterating" messages has now finished.  We don't raise a close/disconnect exception).

         - `not isinstance(channel, aio_pika.abc.AbstractRobustChannel)` is the simplest scenario.  Just always stop.
         - `isinstance(exception, aio_pika.exceptions.ChannelClosed)` will be true if the
           `aio_pika.channel` was closed.
         - `isinstance(exception, asyncio.CancelledError)` will be true if the `aio_pika.connection` was closed or an
           unhandled KeyboardInterrupt has bubbled up.
        """
        if not isinstance(channel, aio_pika.abc.AbstractRobustChannel) or \
           isinstance(exception, aio_pika.exceptions.ChannelClosed) or \
           isinstance(exception, asyncio.CancelledError):
            # if the channel has been explicitly closed, we're not a robust channel or we're shutting down due to an asyncio cancellation...
            self._channel_closed.set()

    async def __anext__(self) -> aio_pika.abc.AbstractIncomingMessage:
        return await _iterate_closable_message_iterator(self._channel_closed, super().__anext__)

Thusly we set an asyncio.Event via a Channel._close_callbacks, which triggers a raise StopAsyncIteration.

nosahama commented 1 year ago

I am facing same issue, managed to circumvent by not using async for msg in iterator(): xyz but rather i get the iterator and retrieve the messages by calling b = anext(iterator) and msg = await b and then when i receive a asyncio.CancelledError, i close the iterator and close the underlying connection.

I've load tested and it's pretty stable, i would say that the async for msg in iterator provides sequential iteration in an async environment, you could also use asyncio.create_task(process_message(msg)) for proper task concurrency.

mosquito commented 1 year ago

@nosahama when you have to process messages asyncronously please use .consume method, this is blazingly fast.