flyte / mqtt-io

Expose GPIO modules (Raspberry Pi, Beaglebone, PCF8754, PiFace2 etc.) and digital sensors (LM75 etc.) to an MQTT server for remote control and monitoring.
MIT License
461 stars 157 forks source link

Implement keepalive loop as a coro to fix issue #282 #315

Closed jmoutte closed 1 year ago

jmoutte commented 1 year ago

Due to the way the code consumes messages, we are not capturing the _on_disconnect Future from asyncio-mqtt. To avoid rewriting a lot of code I implemented a keep alive loop updating the status with a call to publish. This will capture the disconnected future and throw the exception to trigger reconnection.

This fixes issue #282

BenjiU commented 1 year ago

Hi jmoutte,

thanks for your PR, but currently I'm not fine with this fix: are you just sending the "keepalive" message manually? Shouldn't asyncio_mqtt do this internally?

The reconnecting section from asyncio_mqtt says this (what you did with your try/catch, right):

Reconnecting

You can reconnect when the connection to the broker is lost by wrapping your code in a try/except-block and listening for MqttErrors.

import asyncio
import asyncio_mqtt as aiomqtt

async def main():
    reconnect_interval = 5  # In seconds
    while True:
        try:
            async with aiomqtt.Client("test.mosquitto.org") as client:
                async with client.messages() as messages:
                    await client.subscribe("humidity/#")
                    async for message in messages:
                        print(message.payload.decode())
        except aiomqtt.MqttError as error:
            print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
            await asyncio.sleep(reconnect_interval)

asyncio.run(main())

But I guess we need to find the correct section for the try/catch, right? Or am I missing something?

jmoutte commented 1 year ago

Hello BenjiU,

I understand your concerns and recognise that this is not ideal. Keepalive should indeed be implemented using the underlying mqtt client library and this is available in recent versions of asyncio_mqtt. This PR is implementing keep alive with a hack but it's main intent is to detect disconnections and to trigger the reconnection code which would not be handled by keep alive anyway.

I am not an asyncio expert, but after reading the code carefully, I am afraid that the way mqtt_io is using asyncio_mqtt makes it impossible for us to catch the exception of a disconnection when we are only waiting for messages.

Indeed, the example code is using a generator provided by asyncio_mqtt for .messages() that will wait for an AsyncIOQueue OR a disconnected future. This is the trick that is supposed to notify you of a disconnection while waiting for messages. The mqtt_io implementation is instead overriding the on_messages() method short-circuiting the asyncio_mqtt internal queues and ignoring the generator. This could work fine if the on_disconnected callback from paho_mqtt was also overridden but we don't have access to it.

So I haven't found an elegant solution keeping the override of _on_messages callback of pahomqtt and being able to catch disconnection events with the asynchronous callbacks implementation you created.

This PR has now been running for days with multiple disconnections at my place and works great.

BR,

Julien

BenjiU commented 1 year ago

So you are an asyncio expert :-D Okay, when you could fix the lint warning, I would merge this as a work around. Maybe you can add some comment in the code, that this is a workaround?