mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Small difference between `aio_pika.connect` and `aio_pika.connect_robust` #517

Open chandr-andr opened 1 year ago

chandr-andr commented 1 year ago

There is a problem with reconnecting. In some services, I don't need to reconnect to RabbitMQ server if it falls connect_robust tries to reconnect, so connect just stops doing anything and writes a log warning.

I tried to rewrite the RobustConnection and Connection classes to stop connection and raise an exception cuz I need to stop the programm, but somewhere in the roots of your and aiormq lib always catches my exception and continues reconnecting.

Could you, please, help me with this problem cuz I don't understand what I have to do?

There is an example - I rewrote the connection method and start raising StopReconnectionException() but there is no effect.

    async def connect(self, timeout: TimeoutType = None) -> None:  # noqa: WPS231
        """
        Tries to connect.

        :param timeout: timeout for connect.

        :raises RuntimeError: if reconnecting.
        """
        if self.is_closed:
            raise RuntimeError(f"{self!r} connection closed")

        if self.reconnecting:
            raise RuntimeError(
                (
                    "Connect method called but connection "
                    f"{self!r} is reconnecting right now."
                ),
                self,
            )

        async with self._reconnect_lock:
            self.transport = None
            self.connected.clear()

            while not self.is_closed:
                close_callback = OneShotCallback(
                    self._on_connection_close,
                )
                try:
                    connection = await self.__connection_attempt(timeout)
                    self.connected.set()

                    connection.closing.add_done_callback(close_callback)
                    self.transport = UnderlayConnection(
                        connection=connection,
                        close_callback=close_callback,
                    )

                    self.fail_fast = False
                    return
                except CONNECTION_EXCEPTIONS:
                   raise StopReconnectionException()