adafruit / Adafruit_CircuitPython_MiniMQTT

MQTT Client Library for CircuitPython
Other
72 stars 50 forks source link

Race condition in subscribe #192

Closed romkey closed 6 months ago

romkey commented 6 months ago

The subscribe() method sends a subscription request to the broker and waits until it receives a SUBACK message. While waiting, if it receives any messages that aren't of type SUBACK it raises an MMQTTException: "invalid message received as response to SUBSCRIBE: {hex(op)}"

It's valid to receive PUBLISH messages between the sending the SUBSCRIBE and receiving the SUBACK. Even if the broker acts synchronously in response to the SUBSCRIBE, it could have been transmitting a PUBLISH at the same time as the client was sending the SUBSCRIBE. This will only happen when the application subscribes to multiple topics, but can happen easily on a busy MQTT network.

Here's debugging output showing an example of this happening. In this case multiple subscribe requests are issued inside the onConnect callback.

Connecting to WiFi...
Connected!
946.886: DEBUG - Attempting to connect to MQTT broker (attempt #0)
946.887: DEBUG - Attempting to establish MQTT connection...
946.890: INFO - Establishing an INSECURE connection to 10.0.1.11:1883
947.001: DEBUG - Sending CONNECT to broker...
947.003: DEBUG - Fixed Header: bytearray(b'\x10)\x00')
947.005: DEBUG - Variable Header: bytearray(b'\x04MQTT\x04\xc2\x00<')
947.080: DEBUG - Receiving CONNACK packet from broker
947.092: DEBUG - Got message type: 0x20 pkt: 0x20
Connected to MQTT broker! Listening for topic changes
947.098: DEBUG - SUBSCRIBING to topic zigbee2mqtt/# with QoS 0
947.119: DEBUG - Got message type: 0x90 pkt: 0x90
947.127: DEBUG - SUBSCRIBING to topic givemeasign/# with QoS 0
947.143: DEBUG - Got message type: 0x30 pkt: 0x31
947.158: DEBUG - Receiving PUBLISH
Topic: zigbee2mqtt/bridge/state
Msg: bytearray(b'online')

New message on topic zigbee2mqtt/bridge/state: online
947.163: INFO - MMQT error: invalid message received as response to SUBSCRIBE: 0x30
947.165: DEBUG - Reconnect timeout computed to 2.00
947.166: DEBUG - adding jitter 0.91 to 2.00 seconds
947.167: DEBUG - Attempting to connect to MQTT broker (attempt #1)

The library shouldn't raise an exception in this case. The PUBLISH has already been processed, the loop just needs to not raise an exception in this case.

When this happens during the onConnect callback, the connect() method catches the exception and tries to reconnect. If this happens every time, connect() will fail even though a successful connection was made each time.

The fix is very simple, just don't raise the exception if the non-SUBACK packet is a PUBLISH packet. By this point in the processing, the packet has already been properly handled.

Example code (based off Adafruit's example):

import os
import time
import board
import busio
from digitalio import DigitalInOut

import adafruit_logging

from adafruit_esp32spi import adafruit_esp32spi
import adafruit_esp32spi.adafruit_esp32spi_socket as socket

import adafruit_minimqtt.adafruit_minimqtt as MQTT

esp32_cs = DigitalInOut(board.ESP_CS)
esp32_ready = DigitalInOut(board.ESP_BUSY)
esp32_reset = DigitalInOut(board.ESP_RESET)

spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)

def connected(client, userdata, flags, rc):
    print(f"Connected to MQTT broker! Listening for topic changes")

    client.subscribe("zigbee2mqtt/#")
    client.subscribe("givemeasign/#")
    client.subscribe("rtlamr/#")
    return

def disconnected(client, userdata, rc):
    print("Disconnected from MQTT Broker!")

def message(client, topic, message):
    print(f"New message on topic {topic}: {message}")

print("Connecting to WiFi...")
esp.connect_AP(os.getenv("wifi_ssid"), os.getenv("wifi_password"))
print("Connected!")

MQTT.set_socket(socket, esp)

mqtt_client = MQTT.MQTT(
                broker=os.getenv("MQTT_BROKER"),
                port=os.getenv("MQTT_PORT") or 1888,
                is_ssl=os.getenv("MQTT_SSL") or False,
                client_id=os.getenv("MQTT_CLIENTID"),
                username=os.getenv("MQTT_USERNAME"),
                password=os.getenv("MQTT_PASSWORD")
)

mqtt_client.on_connect = connected
mqtt_client.on_disconnect = disconnected
mqtt_client.on_message = message

logger = adafruit_logging.Logger("logger", 0)
logger.addHandler(adafruit_logging.StreamHandler)

mqtt_client.enable_logger(adafruit_logging, 0)

mqtt_client.connect()
print("MQTT connect returned")

while True:
    mqtt_client.loop(timeout=0.1)

I've submitted PR 193 with a three line fix for the problem.

vladak commented 6 months ago

you are not alone: https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT/pull/187#issuecomment-1834640175

romkey commented 6 months ago

Ah, thank you! I saw your PR but didn't read far enough into it to see you'd covered this. I'll close my PR since you've done a lot more than I did. Thanks!