adafruit / Adafruit_CircuitPython_MiniMQTT

MQTT Client Library for CircuitPython
Other
72 stars 50 forks source link

encode/decode remaining length properly for {,UN}SUBSCRIBE/SUBACK #187

Closed vladak closed 6 months ago

vladak commented 7 months ago

This change fixes remaining length encoding for SUBSCRIBE packets, allowing subscribe to pass for remaning length bigger than 127. Tested on CPython with:

#!/usr/bin/env python3

import json
import socket
import ssl
import sys

import adafruit_minimqtt.adafruit_minimqtt as MQTT

import logging

def main():
    logging.basicConfig()
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)

    logger.info("Creating MQTT instance")

    broker = "172.40.0.3"
    port = 1883
    mqtt_client = MQTT.MQTT(
        broker=broker,
        port=port,
        socket_pool=socket,
        ssl_context=ssl.create_default_context(),
        connect_retries=3,
    )

    mqtt_client.enable_logger(logging, log_level=logging.DEBUG)

    logger.info("Connecting to MQTT broker")
    mqtt_client.connect()

    topics = [("broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong",0)]
    logger.info(f"subscribing to {topics}")
    mqtt_client.subscribe(topics)

    i = 0
    while True:
        logger.info(f"iteration {i}")
        ret = mqtt_client.loop(timeout=3)
        if ret is not None:
             logger.info(f"got {ret}")
        i = i + 1

if __name__ == "__main__":
    main()

which produces the following output:

INFO:__main__:Creating MQTT instance
INFO:__main__:Connecting to MQTT broker
DEBUG:log:Attempting to connect to MQTT broker (attempt #0)
DEBUG:log:Attempting to establish MQTT connection...
INFO:log:Establishing an INSECURE connection to 172.40.0.3:1883
DEBUG:log:Sending CONNECT to broker...
DEBUG:log:Fixed Header: bytearray(b'\x10\x14\x00')
DEBUG:log:Variable Header: bytearray(b'\x04MQTT\x04\x02\x00<')
DEBUG:log:Receiving CONNACK packet from broker
DEBUG:log:Got message type: 0x20
DEBUG:log:Resetting reconnect backoff
INFO:__main__:subscribing to [('broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong', 0)]
DEBUG:log:Sending SUBSCRIBE to broker...
DEBUG:log:Fixed Header: bytearray(b'\x82\x80\x01')
DEBUG:log:Variable Header: b'\x00\x01'
DEBUG:log:SUBSCRIBING to topic broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong with QoS 0
DEBUG:log:packet: b'\x00{broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong\x00'
DEBUG:log:Got message type: 0x90
INFO:__main__:iteration 0
DEBUG:log:waiting for messages for 3 seconds
DEBUG:log:Loop timed out after 3 seconds
INFO:__main__:iteration 1
DEBUG:log:waiting for messages for 3 seconds
...
FoamyGuy commented 7 months ago

I attempted to test this version on a FunHouse 9.0.0-alpha.5 using the native networking simpletest from this repo.

It seems like it is having trouble subscribing to the onoff feed. It keeps printing this over and over: Connected to Adafruit IO! Listening for topic changes on Foamyguy/feeds/onoff and never gets to the point in the main loop where it starts publishing new values. The same test script with current main branch executes successfully and is able to subscribe and receive new data + publish data on the other feed.

I'm attaching two log files from the more verbose logging output. One with the version from this PR and one with current main.

mqtt_main.log mqtt_pr187.log

The particulars of the MQTT spec and connection are beyond my knowledge so I'm not sure what could cause this.

Let me know if there is any additional testing or info I could provide to try to help figure out what could cause this issue.

test script:

# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
# SPDX-License-Identifier: MIT

import os
import time
import ssl
import socketpool
import wifi
import adafruit_minimqtt.adafruit_minimqtt as MQTT

# Add settings.toml to your filesystem CIRCUITPY_WIFI_SSID and CIRCUITPY_WIFI_PASSWORD keys
# with your WiFi credentials. DO NOT share that file or commit it into Git or other
# source control.

# Set your Adafruit IO Username, Key and Port in settings.toml
# (visit io.adafruit.com if you need to create an account,
# or if you need your Adafruit IO key.)
aio_username = os.getenv("aio_username")
aio_key = os.getenv("aio_key")

print(f"Connecting to {os.getenv('CIRCUITPY_WIFI_SSID')}")
wifi.radio.connect(
    os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD")
)
print(f"Connected to {os.getenv('CIRCUITPY_WIFI_SSID')}!")
### Feeds ###

# Setup a feed named 'photocell' for publishing to a feed
photocell_feed = aio_username + "/feeds/photocell"

# Setup a feed named 'onoff' for subscribing to changes
onoff_feed = aio_username + "/feeds/onoff"

### Code ###

# Define callback methods which are called when events occur
# pylint: disable=unused-argument, redefined-outer-name
def connected(client, userdata, flags, rc):
    # This function will be called when the client is connected
    # successfully to the broker.
    print(f"Connected to Adafruit IO! Listening for topic changes on {onoff_feed}")
    # Subscribe to all changes on the onoff_feed.
    client.subscribe(onoff_feed)

def disconnected(client, userdata, rc):
    # This method is called when the client is disconnected
    print("Disconnected from Adafruit IO!")

def message(client, topic, message):
    # This method is called when a topic the client is subscribed to
    # has a new message.
    print(f"New message on topic {topic}: {message}")

    # if not client.user_data.get(topic):
    #     client.user_data[topic] = []
    # client.user_data[topic].append(message)
    # 
    # print(client.user_data)

# Create a socket pool
pool = socketpool.SocketPool(wifi.radio)
ssl_context = ssl.create_default_context()

# If you need to use certificate/key pair authentication (e.g. X.509), you can load them in the
# ssl context by uncommenting the lines below and adding the following keys to the "secrets"
# dictionary in your secrets.py file:
# "device_cert_path" - Path to the Device Certificate
# "device_key_path" - Path to the RSA Private Key
# ssl_context.load_cert_chain(
#     certfile=secrets["device_cert_path"], keyfile=secrets["device_key_path"]
# )
messages = {}
# Set up a MiniMQTT Client
mqtt_client = MQTT.MQTT(
    broker="io.adafruit.com",
    port=1883,
    username=aio_username,
    password=aio_key,
    socket_pool=pool,
    ssl_context=ssl_context,
    user_data=messages,
)

# Setup the callback methods above
mqtt_client.on_connect = connected
mqtt_client.on_disconnect = disconnected
mqtt_client.on_message = message

# Connect the client to the MQTT broker.
print("Connecting to Adafruit IO...")
mqtt_client.connect()

photocell_val = 0

messages = {}
while True:
    # Poll the message queue
    mqtt_client.loop()

    # Send a new message
    print(f"Sending photocell value: {photocell_val}...")
    mqtt_client.publish(photocell_feed, photocell_val)
    print("Sent!")
    photocell_val += 1
    time.sleep(5)
vladak commented 7 months ago

Thanks for the testing. I found the extra zero byte in the CONNECT packet strange and did not investigate. As a result I missed the logic for short remaining length. This should work now.

vladak commented 7 months ago

As I've started testing on QtPy with code that basically calls subscribe to a wildcard topic in a loop, I noticed this:

536.291: INFO - subscribing to devices/#                                        
536.291: DEBUG - Sending SUBSCRIBE to broker...                                 
536.293: DEBUG - Fixed Header: bytearray(b'\x82\x0e')                           
536.296: DEBUG - Variable Header: b'\x00\x07'                                   
536.298: DEBUG - SUBSCRIBING to topic devices/# with QoS 0                      
536.300: DEBUG - payload: b'\x00\tdevices/#\x00'                                
536.303: DEBUG - Got message type: 0x30 pkt: 0x30                               
536.307: DEBUG - Receiving PUBLISH                                              
Topic: devices/fusebox/esp32                                                    
Msg: bytearray(b'{"humidity": "29.7", "temperature": "29.5", "pulses": 10749}') 

Traceback (most recent call last):                                              
  File "code.py", line 53, in <module>                                          
  File "code.py", line 45, in main                                              
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 887, in subscribe    
MMQTTException: invalid message received as response to SUBSCRIBE: 0x30         
]0;๐Ÿ172.40.0.23 | 887@/lib/adafruit_minimqtt/adafruit_ MMQTTException | 8.2.6\ 
Code done running.

and upon realizing this is valid behavior according to the spec, added a fix.

vladak commented 7 months ago

Further testing revealed that UNSUBSCRIBE needs to be fixed (sic!) as well. Here's the test code used on QtPy running CircuitPython 8.2.6 on 2023-09-12:

#!/usr/bin/env python3

import adafruit_logging as logging
import random
import socketpool
import ssl
import sys
import time
import wifi

from secrets import secrets

import adafruit_minimqtt.adafruit_minimqtt as MQTT

def main():
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)

    logger.info("Connecting to wifi")
    wifi.radio.connect(secrets["SSID"], secrets["password"], timeout=10)
    logger.info(f"Connected to {secrets['SSID']}")
    logger.debug(f"IP: {wifi.radio.ipv4_address}")

    pool = socketpool.SocketPool(wifi.radio)

    host = "172.40.0.3"
    port = 1883
    mqtt_client = MQTT.MQTT(
        broker=host,
        port=port,
        socket_pool=pool,
        ssl_context=ssl.create_default_context(),
        connect_retries=1,
        recv_timeout=5,
    )

    mqtt_client.logger = logger

    logger.debug(f"connecting")
    mqtt_client.connect()

    topic = "devices"
    # topic length should not exceed the maximum given by spec (65535 bytes).
    for i in range(128):
        logger.info(f"### iteration {i}")
        topic += "/" + ''.join(random.choice('abcdefghijklmnopqrstuvwxyz')
                               for _ in range(random.randrange(3, 16)))
        mqtt_client.subscribe(topic)
        mqtt_client.unsubscribe(topic)

        logger.debug("loop")
        mqtt_client.loop(1)

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        sys.exit(0)

It went through all of the iterations, although at times it seemed that the Mu editor will choke over the long input emitted to the serial console however it always recovered.

vladak commented 6 months ago

Added protocol level tests for the SUBSCRIBE packet. Firstly, I captured short and long (with remaining length encoded as 1 byte and 2 bytes, respectively) SUBSCRIBE/SUBACK packets using Mosquitto client and server, then added an assert that verifies that the SUBSCRIBE packet sent by MiniMQTT is the same as the one sent by Mosquitto client and that MiniMQTT processes the reply as sent from the Mosquitto server successfully.

There is of course potential for more testing, like generating SUBSCRIBE packets with remaining length encoded as 3 and 4 bytes (maximum) while perhaps at the same time using the topics specified with tuple and list. Similarly for UNSUBSCRIBE.

vladak commented 6 months ago

Also, the change that covers the PUBLISH-before-SUBACK case can be tested this way. Let me know.

vladak commented 6 months ago

Never mind. I implemented some of the tests described above.

vladak commented 6 months ago

After adding the test case for long lists of topics in SUBSCRIBE, I noticed that the remaining length parsing for SUBACK also assumes 2 bytes so added a fix.

vladak commented 6 months ago

I believe all these remaining length encoding/decoding implementation problems stem from the insufficiently worded spec. I wrote an e-mail to the IBM guys who were the editors of the 3.1.1 spec to fix it, however one of them does not seem to be working for IBM any longer as the e-mail bounced and the other one did not reply.

vladak commented 6 months ago

The changes are done unless someone raises review concern. Next, I'd like to retest the changes on my trusty QtPy, using both local MQTT broker as well as the public Mosquitto test bed which experiences pretty wild traffic.

vladak commented 6 months ago

Completed all the 128 iterations of the above subscribe/unsubscribe test with randomized topics on the QtPy just fine.

vladak commented 6 months ago

Used this code on the QtPy to follow the wild traffic on test.mosquitto.org:

#!/usr/bin/env python3

import adafruit_logging as logging
import random
import socketpool
import ssl
import sys
import time
import wifi

from secrets import secrets

import adafruit_minimqtt.adafruit_minimqtt as MQTT

def on_message(client, topic, msg):
    #logger = logging.getLogger(__name__)
    #logger.info(f"Got msg on '{topic}' ({len(msg)} bytes)")
    client.user_data[0] += 1
    client.user_data[1] += len(msg)

def main():
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)

    logger.info("Connecting to wifi")
    wifi.radio.connect(secrets["SSID"], secrets["password"], timeout=10)
    logger.info(f"Connected to {secrets['SSID']}")
    logger.debug(f"IP: {wifi.radio.ipv4_address}")

    pool = socketpool.SocketPool(wifi.radio)

    host = "test.mosquitto.org"
    port = 1883
    stats = [0, 0]
    mqtt_client = MQTT.MQTT(
        broker=host,
        port=port,
        socket_pool=pool,
        ssl_context=ssl.create_default_context(),
        connect_retries=1,
        recv_timeout=5,
    use_binary_mode=True,   # test.mosquitto.org has messages with UnicodeError
    user_data=stats,
    )

    # mqtt_client.logger = logger

    mqtt_client.on_message = on_message

    logger.debug(f"connecting")
    mqtt_client.connect()

    topic = "#"
    logger.debug(f"subscribing")
    mqtt_client.subscribe(topic)

    while True:
        mqtt_client.loop(1)
        logger.info(f"Messages: {stats[0]}, bytes: {stats[1]}")

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        sys.exit(0)

It survived several thousands of messages until it got down with the MemoryError. This is a situation from when the logger was set:

2374.516: DEBUG - Got message type: 0x30 pkt: 0x30
Traceback (most recent call last):
  File "code.py", line 55, in <module>
  File "code.py", line 47, in main
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 862, in subscribe
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 1118, in _wait_for_msg
  File "/lib/adafruit_logging.py", line 382, in debug
  File "/lib/adafruit_logging.py", line 328, in _log
MemoryError: memory allocation failed, allocating 23056 bytes
]0;๐Ÿ172.40.0.23 | 328@/lib/adafruit_logging.py MemoryError | 8.2.6\
Code done running.

and this with the MQTT logger left at default, just the callback was used to do the logging:

Traceback (most recent call last):
  File "code.py", line 63, in <module>
  File "code.py", line 58, in main
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 1050, in loop
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 1116, in _wait_for_msg
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 1158, in _sock_exact_recv
MemoryError: memory allocation failed, allocating 60000 bytes
]0;๐Ÿ172.40.0.23 | 1158@/lib/adafruit_minimqtt/adafruit_ MemoryError | 8.2.6\
Code done running.

Also, I got the timeout once:

Traceback (most recent call last):
  File "code.py", line 62, in <module>
  File "code.py", line 57, in main
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 1050, in loop
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 1116, in _wait_for_msg
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 1167, in _sock_exact_recv
OSError: [Errno 116] ETIMEDOUT

None is related to the changes, I'd say.