mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

QueueIterator.__repr__ causes AttributeError looking for _consumer_tag #454

Open TBBle opened 2 years ago

TBBle commented 2 years ago

When closing an already-cancelled QueueIterator, the following exception occurs:

[2022-04-12T10:57:40.157Z] self = <[AttributeError("'QueueIterator' object has no attribute '_consumer_tag'") raised in repr()] QueueIterator object at 0x7faf18cc9310>
[2022-04-12T10:57:40.157Z] 
[2022-04-12T10:57:40.157Z]     def __repr__(self) -> str:
[2022-04-12T10:57:40.157Z]         return (
[2022-04-12T10:57:40.157Z]             f"<{self.__class__.__name__}: "
[2022-04-12T10:57:40.157Z]             f"queue={self._amqp_queue.name!r} "
[2022-04-12T10:57:40.157Z] >           f"ctag={self._consumer_tag!r}>"
[2022-04-12T10:57:40.157Z]         )
[2022-04-12T10:57:40.157Z] E       AttributeError: 'QueueIterator' object has no attribute '_consumer_tag'

This is because QueueIterator.__repr__ unconditially access self._consumer_tag, and is called from async def close before (and even when) that attribute may not exist:

    @task
    async def close(self, *_: Any) -> Any:
        log.debug("Cancelling queue iterator %r", self)

        if not hasattr(self, "_consumer_tag"):
            log.debug("Queue iterator %r already cancelled", self)
            return

That first %r (the failure I'm seeing here) may fail, but the second %r definitely will fail, because we've established there's no _consumer_tag attr.

I'm not sure how long this has been broken, as I'm doing an aio-pika upgrade from a much older version, and our codebase had turned the log-level for aio-pika up to WARNING so the logging.debug messages were not being evaluated before the upgrade.

For reference, the context is an asyncio.Task of the form

            self.connection = await aio_pika.connect_robust(rmq_url)
            async with self.connection.channel() as channel:
                await channel.set_qos(prefetch_count=1)
                queue = await channel.declare_queue(self.rmq_queue_name, durable=True)

                async for message in queue:
                    yield message

and the cancellation is arriving while we're in the async for, waiting for a new message, per the below traceback.

C:\Program Files\Python39\lib\asyncio\base_events.py:642: in run_until_complete
    return future.result()
tests\integration\conftest.py:347: in webservice_client
    yield client
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiohttp\test_utils.py:423: in __aexit__
    await self.close()
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiohttp\test_utils.py:398: in close
    await self._server.close()
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiohttp\test_utils.py:194: in close
    await self.runner.cleanup()
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiohttp\web_runner.py:294: in cleanup
    await self._cleanup_server()
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiohttp\web_runner.py:381: in _cleanup_server
    await self._app.cleanup()
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiohttp\web_app.py:432: in cleanup
    await self.on_cleanup.send(self)
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiosignal\__init__.py:36: in send
    await receiver(*args, **kwargs)  # type: ignore
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiohttp\web_app.py:555: in _on_cleanup
    raise errors[0]
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aiohttp\web_app.py:546: in _on_cleanup
    await it.__anext__()
src\webservice\app.py:190: in initialize_and_cleanup_status_listener
    await status_listener
src\webservice\status_update_api.py:33: in message_dispatcher
    async for message in app["rmq"].message_queue():
..\..\lib\shared\src\ps\shared\rabbitmq.py:74: in message_queue
    async for message in queue:
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aio_pika\queue.py:483: in __anext__
    await asyncio.wait_for(
C:\Program Files\Python39\lib\asyncio\tasks.py:479: in wait_for
    return fut.result()
C:\Users\paulh\AppData\Local\pypoetry\Cache\virtualenvs\webservice-3ZX3vMwV-py3.9\lib\site-packages\aio_pika\queue.py:411: in close
    log.debug("Queue iterator %r closed", self)
C:\Program Files\Python39\lib\logging\__init__.py:1434: in debug
    self._log(DEBUG, msg, args, **kwargs)
C:\Program Files\Python39\lib\logging\__init__.py:1589: in _log
    self.handle(record)
C:\Program Files\Python39\lib\logging\__init__.py:1599: in handle
    self.callHandlers(record)
C:\Program Files\Python39\lib\logging\__init__.py:1661: in callHandlers
    hdlr.handle(record)
C:\Program Files\Python39\lib\logging\__init__.py:952: in handle
    self.emit(record)
C:\Program Files\Python39\lib\logging\__init__.py:1091: in emit
    self.handleError(record)
C:\Program Files\Python39\lib\logging\__init__.py:1083: in emit
    msg = self.format(record)
C:\Program Files\Python39\lib\logging\__init__.py:927: in format
    return fmt.format(record)
C:\Program Files\Python39\lib\logging\__init__.py:663: in format
    record.message = record.getMessage()
C:\Program Files\Python39\lib\logging\__init__.py:367: in getMessage
    msg = msg % self.args