mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Channel reopened twice when connection is reopened under adverse network conditions #449

Closed haudren-woven closed 2 years ago

haudren-woven commented 2 years ago

I just noticed that under the following circumstances:

The restoration process fails with aiormq.exceptions.DuplicateConsumerTag. As I investigated further, it seems that the issue is that the channel is restored twice:

  1. By the connection itself: https://github.com/mosquito/aio-pika/blob/77b91ed34f28a4b01b36e4f87112efd6023bbc42/aio_pika/robust_connection.py#L138
  2. By the callback on channel closing: https://github.com/mosquito/aio-pika/blob/77b91ed34f28a4b01b36e4f87112efd6023bbc42/aio_pika/robust_channel.py#L101

And the barrier in the callback is not triggered, since these are the values of the variables:

However, connection.reconnecting is True, so an easy fix is to add that to this list of exclusions, and I can submit a simple PR going in this direction. However, I wonder if it is the best way to fix that problem. It seems dangerous to have the reopening logic be called from two places, without locking. Please let me know what you prefer!

See below for the full stack trace:

[queue.py:91] Declaring queue: <RobustQueue(amq_0xf1ef88dd268274645525cd933f1e54f2): auto_delete=True, durable=False, exclusive=False, arguments=None
[connection.py:542] Prepare to send ChannelFrame(channel_number=2, frames=[<Exchange.Declare object at 0x7f82d5293b48>], drain_future=None)
[connection.py:549] Sending frame <Exchange.Declare object at 0x7f82d5293b48> in channel #2 on <Connection: "amqp://guest:******@localhost:5672/" at 0x7f82d5537b88>
[connection.py:486] Received frame <Exchange.DeclareOk object at 0x7f83376c5ba8> in channel #2 weight=12 on <Connection: "amqp://guest:******@localhost:5672/" at 0x7f82d55

[queue.py:91] Declaring queue: <RobustQueue(amq_0xf1ef88dd268274645525cd933f1e54f2): auto_delete=True, durable=False, exclusive=False, arguments=None
[connection.py:542] Prepare to send ChannelFrame(channel_number=2, frames=[<Queue.Declare object at 0x7f82d5d700c0>], drain_future=None)
[connection.py:549] Sending frame <Queue.Declare object at 0x7f82d5d700c0> in channel #2 on <Connection: "amqp://guest:******@localhost:5672/" at 0x7f82d5537b88>
[connection.py:486] Received frame <Queue.DeclareOk object at 0x7f82d5d484f8> in channel #2 weight=59 on <Connection: "amqp://guest:******@localhost:5672/" at 0x7f82d5537b

[queue.py:143] Binding queue <RobustQueue(amq_0xf1ef88dd268274645525cd933f1e54f2): auto_delete=True, durable=False, exclusive=False, arguments=None: exchange=<Exchange(exchange
le=True, arguments={})>, routing_key='amq_0xf1ef88dd268274645525cd933f1e54f2', arguments=None
[connection.py:542] Prepare to send ChannelFrame(channel_number=2, frames=[<Queue.Declare object at 0x7f82d5297318>], drain_future=None)
[connection.py:549] Sending frame <Queue.Declare object at 0x7f82d5297318> in channel #2 on <Connection: "amqp://guest:******@localhost:5672/" at 0x7f82d5537b88>
[connection.py:486] Received frame <Queue.DeclareOk object at 0x7f82d552a0e8> in channel #2 weight=59 on <Connection: "amqp://guest:******@localhost:5672/" at 0x7f82d5537b

[queue.py:143] Binding queue <RobustQueue(amq_0xf1ef88dd268274645525cd933f1e54f2): auto_delete=True, durable=False, exclusive=False, arguments=None: exchange=<Exchange(exchange
le=True, arguments={})>, routing_key='amq_0xf1ef88dd268274645525cd933f1e54f2', arguments=None
[connection.py:542] Prepare to send ChannelFrame(channel_number=2, frames=[<Queue.Bind object at 0x7f82e4054d48>], drain_future=None)
[connection.py:549] Sending frame <Queue.Bind object at 0x7f82e4054d48> in channel #2 on <Connection: "amqp://guest:******@localhost:5672/" at 0x7f82d5537b88>
[connection.py:486] Received frame <Queue.BindOk object at 0x7f82d52bf278> in channel #2 weight=12 on <Connection: "amqp://guest:******@localhost:5672/" at 0x7f82d5537b88>
[queue.py:224] Start to consuming queue: <RobustQueue(amq_0xf1ef88dd268274645525cd933f1e54f2): auto_delete=True, durable=False, exclusive=False, arguments=None
[connection.py:542] Prepare to send ChannelFrame(channel_number=2, frames=[<Queue.Bind object at 0x7f82d529ad48>], drain_future=None)
[connection.py:549] Sending frame <Queue.Bind object at 0x7f82d529ad48> in channel #2 on <Connection: "amqp://guest:******@localhost:5672/" at 0x7f82d5537b88>
[connection.py:486] Received frame <Queue.BindOk object at 0x7f82d52bf780> in channel #2 weight=12 on <Connection: "amqp://guest:******@localhost:5672/" at 0x7f82d5537b88>
[queue.py:224] Start to consuming queue: <RobustQueue(amq_0xf1ef88dd268274645525cd933f1e54f2): auto_delete=True, durable=False, exclusive=False, arguments=None
[base_events.py:1285] Task exception was never retrieved
future: <Task finished coro=<RobustChannel.reopen() done, defined at python3.6/site-packages/aio_pika/robust_channel.py:68> exception=<Dupl
icateConsumerTag: The consumer tag specified already exists for this channel: 2>>
Traceback (most recent call last):
  File "python3.6/site-packages/aio_pika/robust_channel.py", line 71, in reopen
    await self.restore()
  File "python3.6/site-packages/aio_pika/robust_channel.py", line 90, in restore
    await queue.restore(self)
  File "python3.6/site-packages/aio_pika/robust_queue.py", line 68, in restore
    await self.consume(consumer_tag=consumer_tag, **kwargs)
  File "python3.6/site-packages/aio_pika/robust_queue.py", line 130, in consume
    arguments=arguments,
  File "python3.6/site-packages/aio_pika/queue.py", line 238, in consume
    timeout=timeout,
  File "python3.6/site-packages/aiormq/channel.py", line 478, in basic_consume
    raise DuplicateConsumerTag(self.number)
aiormq.exceptions.DuplicateConsumerTag: 2
mosquito commented 2 years ago

Duplicate of #446

haudren-woven commented 2 years ago

I mentioned it in my original text, but this issue is not an exact duplicate of #446, although it's really close. The new check introduced by #446 checks:

So the check evaluates to True, and reopen is called despite being in the middle of a reconnection.

Thank you for your understanding.

dizballanze commented 2 years ago

I think that should be fixed in 7.1.1

haudren-woven commented 2 years ago

If I am not mistaken, 7.1.1 only includes #446 , and as I mentioned above it does not address my issue directly, since the connection attempt fails with ConnectionError. But I must say that I'm a little confused as to why the RobustChannel has some auto-reconnection logic as well as the RobustConnection. It seems like it would be more straightforward to put all the logic in one or the other, but I am a relative beginner with AMQP, so I don't know what use-cases this can serve.

mosquito commented 2 years ago

Should be fixed in #450

mosquito commented 2 years ago

@haudren-woven the reason of splitting of reconnection logic is the channels might be closed from broker side without connection closing.

haudren-woven commented 2 years ago

Thank you! I'll test it later but the fix and your explanation both make sense.

haudren-woven commented 2 years ago

Tested today, this seems to have addressed the issue! Thank you very much :bow: