M-o-a-T / moat-mqtt

An async MQTT broker and client, plus DistKV integration
MIT License
21 stars 9 forks source link

fix reconnect, use ConnectionError ... #13

Closed taliesin closed 9 months ago

taliesin commented 2 years ago

This fixes #10 at least for complete network loss and if the broker is down:

async def mqtt_lamp_worker(client, loop):
    "coroutine to handle incoming mqtt messages concerning lamps"

      logger.info("mqtt lamp worker startup")
      async with client.subscription(CONFIG.mqtt.lamp_topic, QOS_1) as sub:
              async for message in sub:
                      if not loop.is_running():
                              break

                      packet = message.publish_packet
                      logger.debug("%s => %s" % (packet.variable_header.topic_name, str(packet.payload.data)))
                      pattern = get_pattern_value(packet.payload.data.decode())
                      if pattern is not None:
                              await set_lamp(client, pattern) # this publishes the new pattern again
                      else:
                              logger.warn("ignoring unknown pattern")

 async def workers():
      "spawn all required threads as anyio task group"
      loop = asyncio.get_event_loop()

      async with open_mqttclient(uri=CONFIG.mqtt.host,
                                                              config={'keep_alive': 30, 'reconnect_retries': -1, 'reconnect_max_interval': 128},
                                                              ) as client:

              async with anyio.create_task_group() as tg:
                      await tg.spawn(cyclic_switch_handler, client, loop) # another task which also publishes the new pattern
                      await tg.spawn(mqtt_lamp_worker, client, loop)

              # never unless error
              logger.error("workers end due to error")
taliesin commented 2 years ago

... even in a 2 line fix you can screw things up.

smurfix commented 9 months ago

Meh. Thanks for the reminder.