PiBrewing / craftbeerpi4

GNU General Public License v3.0
58 stars 28 forks source link

MQTT Sensor Tasks are not cancelled on Changes in the sensor settings -> old task still running in parallel to a new one #105

Closed avollkopf closed 1 year ago

avollkopf commented 1 year ago

I just realized an issue with the mqttsensor where I have not really an idea on how to solve it. Hope you guys have an idea.

When subscribing, the sensor is creating a task in the satellite controller with a callback: self.mqtt_task = self.cbpi.satellite.subcribe(self.Topic, self.on_message)

When changing sensor settings on the hardware page and saving, the on_stop is called that is cancelling the initial subscription:

    async def on_stop(self):
        if not self.mqtt_task.done():
            logging.warning("Task not done -> cancelling")
            self.mqtt_task.cancel()
        try:            
            logging.warning("trying to call cancelled task")
            await self.mqtt_task
        except asyncio.CancelledError:
            logging.warning("Task has been Cancelled")
            pass

Problem is, that the task is never really cancelled:

2023-03-25:18:22:35,947 WARNING  [__init__.py:130] Task not done -> cancelling
2023-03-25:18:22:35,947 WARNING  [__init__.py:133] trying to call cancelled task
2023-03-25:18:22:35,949 WARNING  [satellite_controller.py:166] Sub Cancelled

Although the satellite controller says that the task has been canceled inside the controller, it is not being cancelled in the mqttsensor.

After saving the sensor properties, the system has an additional task and is running twice. Each time you change sensor properties and save them, you have an additional task and the old tasks are still running :-)

avollkopf commented 1 year ago

... Think I found it in the sattelitecontroller.

Original:

    async def _subcribe(self, topic, method):
        while True:
            try:
                if self.client._connected.done():
                    async with self.client.messages() as messages:
                        await self.client.subscribe(topic)
                        async for message in messages:
                            if message.topic.matches(topic):
                                await method(message.payload.decode())
            except asyncio.CancelledError:
                # Cancel
                self.logger.warning("Sub Cancelled")
            except MqttError as e:
                self.logger.error("Sub MQTT Exception: {}".format(e))
            except Exception as e:
                self.logger.error("Sub Exception: {}".format(e))
            # wait before try to resubscribe
            await asyncio.sleep(5)

Changed (break added to the asyncio.CancelledError Exception to prevent restart of the routine):

    async def _subcribe(self, topic, method):
        while True:
            try:
                if self.client._connected.done():
                    async with self.client.messages() as messages:
                        await self.client.subscribe(topic)
                        async for message in messages:
                            if message.topic.matches(topic):
                                await method(message.payload.decode())
            except asyncio.CancelledError:
                # Cancel
                self.logger.warning("Sub Cancelled")
                break
            except MqttError as e:
                self.logger.error("Sub MQTT Exception: {}".format(e))
            except Exception as e:
                self.logger.error("Sub Exception: {}".format(e))
            # wait before try to resubscribe
            await asyncio.sleep(5)

This results at least in the expected behavior.

EDIT: As mentioned, subscription is not restarted, but task seems to be not fully cancelled as this part in the mqttsensor is never executed:


        except asyncio.CancelledError:
            logging.warning("Task has been Cancelled")
            pass
_```
avollkopf commented 1 year ago

Anyhow, will close the issue as self.mqtt_task.cancel() returns true which means that the task has been cancelled.