AlexxIT / XiaomiGateway3

Home Assistant custom component for control Xiaomi Multimode Gateway (aka Gateway 3), Xiaomi Multimode Gateway 2, Aqara Hub E1 on default firmwares over LAN
https://github.com/AlexxIT/Blog
MIT License
2.47k stars 348 forks source link

Fix `InvalidStateError: invalid state` when enable statistics sensors. #1360

Open caibinqing opened 6 months ago

caibinqing commented 6 months ago

Fix #1325

Error logs:

2024-09-14 19:58:18.227 ERROR (MainThread) [custom_components.xiaomi_gateway3.gate.10.1.0.104] {'msg': 'dispatch_event: mqtt_publish (<custom_components.xiaomi_gateway3.core.mini_mqtt.MQTTMessage object at 0xffff2bc39430>,) {}'}
Traceback (most recent call last):
File "/config/custom_components/xiaomi_gateway3/core/gate/base.py", line 91, in dispatch_event
handler(*args, **kwargs)
File "/config/custom_components/xiaomi_gateway3/core/gate/miot.py", line 15, in miot_on_mqtt_publish
self.miot_process_properties(msg.json["params"], from_cache=False)
File "/config/custom_components/xiaomi_gateway3/core/gate/miot.py", line 60, in miot_process_properties
device.dispatch({device.type: ts})
File "/config/custom_components/xiaomi_gateway3/core/device.py", line 211, in dispatch
handler(data)
asyncio.exceptions.InvalidStateError: invalid state

Reproduction process:

  1. Two or more gateways
  2. A mesh device without battery
  3. Enable statistics sensors
# .../core/gate/miot.py

class MIoTGateway(XGateway):
    def miot_on_mqtt_publish(self, msg: MQTTMessage):
        if msg.topic in ("miio/report", "central/report"):
            ...
        elif msg.topic == "miio/command_ack":
            # check if it is response from `get_properties` command
            ...
                self.miot_process_properties(result, from_cache=True)  # <-- 4. Got response

    def miot_process_properties(self, params: list, from_cache: bool):
        ...
        for did, params in devices.items():
            device = self.devices[did]
            device.on_report(params, self, ts)  # <-- 5. There is a `dispatch` here, will call all listeners, include `set_result`
            if self.stats_domain and device.type in (BLE, MESH):
                device.dispatch({device.type: ts})  # <-- 6. Second `dispatch`, `set_result` but the Future is already done.

    ...

    async def miot_send(self, device: XDevice, payload: dict):
        ...
        # check if we can send command via any second gateway
        gw2 = next((gw for gw in device.gateways if gw != self and gw.available), None)
        if gw2:
            await self.mqtt_publish_multiple(device, payload, gw2)  # <-- 1. Send a `get_properties` command
        else:
            await self.mqtt.publish("miio/command", payload)

    async def mqtt_publish_multiple(
        self, device: XDevice, payload: dict, gw2, delay: float = 1.0
    ):
        fut = asyncio.get_event_loop().create_future()
        device.add_listener(fut.set_result)  # <-- 2. Add the listener
        await self.mqtt.publish("miio/command", payload)
        try:
            async with asyncio.timeout(delay):
                await fut  # <-- 3. Awaiting
        except TimeoutError:
            await gw2.mqtt.publish("miio/command", payload)
        finally:
            device.remove_listener(fut.set_result)  # <-- 7. Maybe too late

Since this Future is only used to detect timeout, it should be safe to add a done check to avoid multiple calls. Or if you have a better fix, feel free to close this.