sbtinstruments / aiomqtt

The idiomatic asyncio MQTT client, wrapped around paho-mqtt
https://sbtinstruments.github.io/aiomqtt
BSD 3-Clause "New" or "Revised" License
392 stars 71 forks source link

Impossible to reuse client in case of connection timeout #269

Closed alhimik45 closed 3 months ago

alhimik45 commented 5 months ago

I'm using almost the same code for reconnects as in docs:

async def main():
    client = aiomqtt.Client(broker, port, identifier = client_id, clean_session = True)
    reconnect_interval = 5
    while True:
        try:
            print(f"Connecting to {broker}:{port}")
            async with client:
                await client.subscribe(topic, qos = 1)
                print(f"Subscribed to {topic}")
                async for message in client.messages:
                    pass
        except aiomqtt.MqttError as err:
            print(f"Connection lost: {err};")
            print(f"Reconnecting in {reconnect_interval} seconds ...")
            await asyncio.sleep(reconnect_interval)

asyncio.run(main())

If broker due to some problems cannot respond in given timeout aiomqtt gets Operation timed out error and client cannot be reused after that.

Example log:

Connecting to broker.local:1883
Connection lost: Operation timed out;
Reconnecting in 5 seconds ...
Connecting to broker.local:1883
Connection lost: The client context manager is reusable, but not reentrant;
Reconnecting in 5 seconds ...
Connecting to broker.local:1883
Connection lost: The client context manager is reusable, but not reentrant;

With other error like Connection refused if works fine:

Connecting to broker.local:1883
Connection lost: [Errno 111] Connection refused;
Reconnecting in 5 seconds ...
Connecting to broker.local:1883
Connection lost: [Errno 111] Connection refused;
Reconnecting in 5 seconds ...

I expect that regardless of connection error, client can be reused for reconnection

4s1 commented 5 months ago

Intetersting, here I am using Python 3.11.6 on archlinux. Using the given code snipped (with a customized broker and enhanced error log) gives me

 Connection lost, [code:5] The connection was refused.; Reconnecting in 5 seconds ...
 Connection lost, The client context manager is reusable, but not reentrant; Reconnecting in 5 seconds ...
import asyncio
import aiomqtt

async def main():
    client = aiomqtt.Client("broker1.internal")
    interval = 5  # Seconds
    while True:
        try:
            async with client:
                await client.subscribe("humidity/#")
                async for message in client.messages:
                    print(message.payload)
        except aiomqtt.MqttError as e:
            print(f"Connection lost, {e}; Reconnecting in {interval} seconds ...")
            await asyncio.sleep(interval)

asyncio.run(main())
tsturzl commented 4 months ago

I believe this issue was fixed as of the 1.2.1 release: https://github.com/sbtinstruments/aiomqtt/releases/tag/v1.2.1

There was another ticket that was fixed and closed mentioned in the release notes. I believe that fixes the issue. I'd recommend making sure you're running that version. I was having a similar issue, and upgrading to 1.2.1 fixed it. I assume the 2.0.0 release also addresses the issue.

4s1 commented 4 months ago

Well. I have 2.0.0 installed here and used the example script to connect to a broker, which my firewall prohibits (icmp port closed reply) to emulate a connection issue (without the delay part).

So you can test yourself, this isn't fixed with 2.0.0.

That's why i've created a merge request for current master

tsturzl commented 4 months ago

I could be wrong. I'm not running 2.0.0 yet, but I know that moving to 1.2.1 had fixed this issue for me. Hopefully this is resolved in 2.0.0, it's good to know it's still an issue there, so I can make sure we await the fix in that version before upgrading. I'm not a contributor, I just saw this while looking for another bug and recently stumbled upon a similar issue.

I'm also realizing I may be wrong, I think the fix originally landed in 1.2.0. The original issue is #244 which was fixed by PR #245. I'm not sure if it's the same issue, but it seems very similar.

4s1 commented 4 months ago

Okay, i went through client.py history. A slightly different issue (with same symptoms) was indeed fixed in #244, but this issue was introduced thereafter by b989089

tsoos99dev commented 4 months ago

I've tried with both aiomqtt 2.0.0 and 1.2.1 and ([code:5] The connection was refused) error renders the client unusable by leaving the internal lock in the locked state. It'd be nice if the client was reusable.

async def run(self):
    while True:
        try:
            async with self.client as client:
                await self._process_messages(client)
        except aiomqtt.MqttError as e:
            logger.warning(f"Connection lost ({e}). Reconnecting in {self.reconnect_interval} seconds ...")
            await asyncio.sleep(5.0)
empicano commented 3 months ago

Hi all, thanks for the great discussion! 😎 With 2.0.1, both (1.) a negative CONNACK response and (2.) a timeout while waiting on the CONNACK response should now be correctly handled.

(Note that there's currently another issue related to reconnection that's still open. The workaround is to use client._messages() instead of client.messages.)

Please reopen if there's anything that's still not working, or otherwise left unclear 🙂