mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

rabbitmq server restart #433

Open wangjianweiwei opened 2 years ago

wangjianweiwei commented 2 years ago

shutdown rabbitmq server, and AIO_Pika will retry the connection and restart rabbitmq server in a few minutes

client code

class PikaManager(AsyncAioPikaManager):

    def __init__(self, *args, **kwargs):
        super(PikaManager, self).__init__(*args, **kwargs)
        self.publish_connect = None  # type:RobustConnection
        self.publish_channel = None  # type:RobustChannel
        self.publish_exchange = None  # type:RobustExchange
        self.reopen_lock = None  # type: Lock

    @retry(attempts=forever)
    async def _publish(self, data):
        try:
            # ======================= Print here
            print("try", self.publish_channel.is_closed, self.publish_channel._channel, id(self.publish_channel._channel))
            if self.publish_channel.is_closed:
                async with self.reopen_lock:
                    if self.publish_channel.is_closed:
                        await self.publish_channel.reopen()
                        self.publish_exchange = await self.publish_channel.get_exchange(self.channel, ensure=False)

            message = aio_pika.Message(body=pickle.dumps(data), delivery_mode=aio_pika.DeliveryMode.PERSISTENT)
            await self.publish_exchange.publish(message, routing_key='*')
        except BaseException as e:
            print("expect", self.publish_channel.is_closed)
            print(e)
            capture_exception(e)
            raise e

    async def init_connect(self, loop):
        self.publish_connect = await aio_pika.connect_robust(self.url, timeout=10)
        self.publish_channel = await self.publish_connect.channel()
        self.publish_exchange = await self.publish_channel.get_exchange(self.channel, ensure=False)
        self.reopen_lock = Lock(loop=loop)

Another print aiormq.channel.Channel.lock

    def lock(self) -> asyncio.Lock:
        # =========================== Print here
        print(self, ":???????", id(self))
        if self.is_closed:
            raise ChannelInvalidStateError("%r closed" % self)

        return self.__lock

output

try False 1 140623314523296
2 :??????? 140623314174176

In this case, channel.is_closed returned result of closed is not accurate because it is not the same channel. Did I ignore the problem

mosquito commented 2 years ago

Is it reproducing in version 7.0.1?