empicano / aiomqtt

The idiomatic asyncio MQTT client
https://aiomqtt.bo3hm.com
BSD 3-Clause "New" or "Revised" License
430 stars 77 forks source link

Client misses disconnection event #326

Closed joyfullperson closed 3 months ago

joyfullperson commented 3 months ago

Hello,

I am experiencing an issue regarding the connection management and I would like to have your feedback on it. I have multiple IoT devices deployed and the reconnection mechanism works pretty good under normal circumstances.But I have noticed that when, for some reason, some of them don't push any data trough the publish queue for a long time, and they disconnect from the server (with reason keep-alive), the device doesn't attempt to reconnect. As soon as it has another item queued and it tries to publish again then it immediately tires to reconnect. Had one device that attempted to reconnect a whole hour after it disconnected seemingly for no reason.

The topic the device is subscribed to almost never receives messages.

My app runs on MPU with limited resources (not used for anything else but runs this app) and I have also noticed that an iteration of the client to reconnect takes a really long time (40-60s). Don't know if this is relevant at all though.

I would really appreciate some feedback on the matter, perhaps I am using the library in a wrong way or I miss some aspect of how the python asyncio works and it causes this issue (not really familiar with it). I provide the following code snippet that describes the functionality of what I am doing

async def _sub_to_topic(self):
    await self._client.subscribe(f"topic")
    async for message in self._client.messages:
           # do something with message 

async def _publisher(self):
    while not (self._exit_event.is_set() and self._data_queue.empty()):
        try:
            item = await asyncio.wait_for(self._data_queue.get(), 1)
        except asyncio.TimeoutError:
            continue
        await self._client.publish(**item)
        self._data_queue.task_done()

async def _manage_connection(self):
    sub_to_topic_task = None
    while not self._exit_event.is_set():
        try:
            async with self._client:
                remote_mgmt_task = asyncio.create_task(self.sub_to_topic())
                await self._publisher()
                sub_to_topic_task.cancel()
        except mqtt.MqttError:
            if not (sub_to_topic_task is None):
                sub_to_topic_task.cancel()
            logger.warning(
                f"Connection lost; Reconnecting in {self._retry_interval} seconds.")

async def start(self):
    self._exit_event = asyncio.Event()
    self._tasks = (asyncio.create_task(self._manage_connection()), )

start function is called in main.py module which runs the main loop

empicano commented 3 months ago

Hi there, thanks for opening this issue! 🙂

[...] when, for some reason, some of them don't push any data trough the publish queue for a long time, and they disconnect from the server (with reason keep-alive), the device doesn't attempt to reconnect. As soon as it has another item queued and it tries to publish again then it immediately tires to reconnect.

If I understand you (and the example code) correctly, the client does reconnect, but only after a publish call has failed, is that it?

There's an interesting discussion on reconnection in #287, in essence, reconnection is unfortunately a bit difficult, and what you reference is actually one of the questions that are still open: Should the client reconnect in the background or only when calling a function like reconnect?

If I understand you correctly, you want to know immediately when the client disconnects. We do get a signal for that, namely an exception while waiting for the next message in async for message in client.messages. However, in your code we never await the _sub_to_topic task, so it fails silently in the background. Instead of awaiting only the _publisher task, you could try using asyncio.wait() together with the FIRST_COMPLETED option to propagate the exception to your _manage_connection method.

The long times it takes to reconnect do seem weird, maybe you can find out more about that, so we can see if the problem is aiomqtt or something else?

Let me know if that helps!

joyfullperson commented 3 months ago

Hello there,

Thanks for the detailed and and very informative answers. I think using asyncio.wait() makes more sense in my case indeed!

In the meantime I have "solved" my problem by using asyncio.events (for the disconnection and connection) but it is messy and not really maintainable, so when I'll refactor the codebase I will want to use a better method of doing the reconnection by abstracting it from the tasks themselves.

Should I close this issue and in the future, jump in that discussion to get ideas?

empicano commented 3 months ago

That's great! Yes, let's close this and combine all the ideas around reconnection inside the linked PR 👍 It's super interesting to hear how people use aiomqtt, and how people think reconnection would work best!