adafruit / Adafruit_CircuitPython_MiniMQTT

MQTT Client Library for CircuitPython
Other
80 stars 49 forks source link

Maximum topic length #160

Closed LelandSindt closed 9 months ago

LelandSindt commented 1 year ago

I am working with Adafruit CircuitPython 8.0.4 on 2023-03-15; Raspberry Pi Pico W with rp2040, Board ID:raspberry_pi_pico_w. Library version adafruit-circuitpython-bundle-8.x-mpy-20230315.

When I try to subscribe to multiple "long" or a single "very long" topic I get the following error(s)

 Traceback (most recent call last):
  File "code.py", line 73, in <module>
  File "adafruit_minimqtt/adafruit_minimqtt.py", line 831, in subscribe
  File "adafruit_minimqtt/adafruit_minimqtt.py", line 1041, in _wait_for_msg
MMQTTException: 

Code done running.

Here is the code I am running...

import wifi
import os
import time
import socketpool
import ssl
import adafruit_minimqtt.adafruit_minimqtt as MQTT

print()
print("Connecting to WiFi")
#  connect to your SSID
wifi.radio.connect(os.getenv('CIRCUITPY_WIFI_SSID'), os.getenv('CIRCUITPY_WIFI_PASSWORD'))
print("Connected to WiFi")
#  prints MAC address to REPL
print("My MAC addr:", [hex(i) for i in wifi.radio.mac_address])
#  prints IP address to REPL
print("My IP address is", wifi.radio.ipv4_address)

def connect(mqtt_client, userdata, flags, rc):
  # This function will be called when the mqtt_client is connected
  # successfully to the broker.
  print("Connected to MQTT Broker!")
  print("Flags: {0}\n RC: {1}".format(flags, rc))

def subscribe(mqtt_client, userdata, topic, granted_qos):
  # This method is called when the mqtt_client subscribes to a new feed.
  print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))

def message(client, topic, message):
  # Method called when a client's subscribed feed has a new value.
  print("New message on topic {0}: {1}".format(topic, message))

# Create a socket pool
pool = socketpool.SocketPool(wifi.radio)

# Set up a MiniMQTT Client
mqtt_client = MQTT.MQTT(
    broker="test.mosquitto.org",
    socket_pool=pool,
    ssl_context=ssl.create_default_context(),
)

# Connect callback handlers to mqtt_client
mqtt_client.on_connect = connect
mqtt_client.on_subscribe = subscribe
mqtt_client.on_message = message

print("Attempting to connect to %s" % mqtt_client.broker)
mqtt_client.connect()

# working topics...
#topics = ("$SYS/broker/subscriptions/count",0)
#topics = [("$SYS/broker/subscriptions/count",0),("$SYS/broker/uptime",0)]
#topics = [("short/path1",0),("short/path2",0)]
#topics = [("longer/path/path1",0),("longer/path/path2",0)]
#topics = [("working/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path",0)]
#topics = [("long/path/path/path/path/path/path/path/path/path/path1",0),("long/path/path/path/path/path/path/path/path/path/path2",0)]

# Broken topics
#topics = [("longer/path/path/path/path/path/path/path/path/path/path/path1",0),("longer/path/path/path/path/path/path/path/path/path/path/path2",0)]
topics = [("broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong",0)]

mqtt_client.subscribe(topics)

while True:
  mqtt_client.loop()

I have not been able to find any topic length limits in the documentation. Is this behaviour expected?

LelandSindt commented 1 year ago

for what its worth.. I have implemented a work around that "works for now"... https://github.com/LelandSindt/analoguEnvoy/blob/8c307e620575397945cd833c15425e1b4d854dbc/circutpythonMQTT/code.py#L71

brentru commented 1 year ago

@LelandSindt How long are the topics you are attempting to subscribe to? How many bytes? Could you give me an example of one?

There is a topic string length of 65536 bytes imposed by the MQTT specification itself.

LelandSindt commented 1 year ago

@brentru, Thank you for reaching out. The code included in the initial issue is a working example that connects to test.mosquitto.org. Either of the two topic configs under the #Broken topics header will cause the error, and any of the topic configs under the #Working topics header should work without issue.

This example..

echo -n "broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong" | wc -c
123

with a total of 123 characters it is certainly shorter than 65k bytes in length.

brentru commented 1 year ago

@LelandSindt: Thank you for the feedback! I'm unsure if the issue is the topic length or the number of sub-topics. Marking this as a bug until we can formally test this and resolve it.

LelandSindt commented 1 year ago

In my experimentation and experience it is all about length regardless of number of subtopics. for example subscribing to homeassistant/sensor/envoy_2022xxxxxxxx_current_power_production/state and homeassistant/sensor/envoy_2022xxxxxxxx_current_power_consumption/state with all off 4 topics per subscription caused the failure.

Please let me known if you need any more specific details.

vladak commented 10 months ago

Looking at line 1041 (as reported initially in https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT/issues/160#issue-1648578081) in the source code of adafruit-circuitpython-bundle-py-20230315/lib/adafruit_minimqtt/adafruit_minimqtt.py (Adafruit_CircuitPython_MiniMQTT version 7.3.2) there is no enlightement:

  1022      def _wait_for_msg(self, timeout: float = 0.1) -> Optional[int]:
  1023          # pylint: disable = too-many-return-statements
  1024  
  1025          """Reads and processes network events.
  1026          Return the packet type or None if there is nothing to be received.
  1027          """
  1028          # CPython socket module contains a timeout attribute
  1029          if hasattr(self._socket_pool, "timeout"):
  1030              try:
  1031                  res = self._sock_exact_recv(1)
  1032              except self._socket_pool.timeout:
  1033                  return None
  1034          else:  # socketpool, esp32spi
  1035              try:
  1036                  res = self._sock_exact_recv(1)
  1037              except OSError as error:
  1038                  if error.errno in (errno.ETIMEDOUT, errno.EAGAIN):
  1039                      # raised by a socket timeout if 0 bytes were present
  1040                      return None
  1041                  raise MMQTTException from error
vladak commented 10 months ago

Trying this with CPython and the latest gratest MiniMQTT (926846c45664c9155a67a11395387e5b53159be4), I too get an exception:

INFO:__main__:Creating MQTT instance
INFO:__main__:Connecting to MQTT broker
DEBUG:log:Attempting to connect to MQTT broker (attempt #0)
DEBUG:log:Attempting to establish MQTT connection...
INFO:log:Establishing an INSECURE connection to 172.40.0.3:1883
DEBUG:log:Sending CONNECT to broker...
DEBUG:log:Fixed Header: bytearray(b'\x10\x14\x00')
DEBUG:log:Variable Header: bytearray(b'\x04MQTT\x04\x02\x00<')
DEBUG:log:Receiving CONNACK packet from broker
DEBUG:log:Got message type: 0x20
DEBUG:log:Resetting reconnect backoff
INFO:__main__:subscribing to [('broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong', 0)]
DEBUG:log:SUBSCRIBING to topic broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong with QoS 0
Traceback (most recent call last):
  File "/Users/vladimirkotal/Pi/Adafruit_CircuitPython_MiniMQTT/./mqtt_basic.py", line 71, in <module>
    main()
  File "/Users/vladimirkotal/Pi/Adafruit_CircuitPython_MiniMQTT/./mqtt_basic.py", line 40, in main
    mqtt_client.subscribe(topics)
  File "/Users/vladimirkotal/Pi/Adafruit_CircuitPython_MiniMQTT/adafruit_minimqtt/adafruit_minimqtt.py", line 830, in subscribe
    op = self._wait_for_msg()
         ^^^^^^^^^^^^^^^^^^^^
  File "/Users/vladimirkotal/Pi/Adafruit_CircuitPython_MiniMQTT/adafruit_minimqtt/adafruit_minimqtt.py", line 1025, in _wait_for_msg
    res = self._sock_exact_recv(1)
          ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vladimirkotal/Pi/Adafruit_CircuitPython_MiniMQTT/adafruit_minimqtt/adafruit_minimqtt.py", line 1114, in _sock_exact_recv
    recv_len = self._sock.recv_into(rc, bufsize)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [Errno 54] Connection reset by peer

Yes, the broker hung up on the client after seeing the subscribe request.

The code to reproduce this is trivial:

#!/usr/bin/env python3

import json
import socket
import ssl
import sys

import adafruit_minimqtt.adafruit_minimqtt as MQTT

import logging

def main():
    """
    Demonstrate how to use MQTT log handler.
    """
    logging.basicConfig()
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)

    logger.info("Creating MQTT instance")

    broker = "172.40.0.3"
    port = 1883
    mqtt_client = MQTT.MQTT(
        broker=broker,
        port=port,
        socket_pool=socket,
        ssl_context=ssl.create_default_context(),
        connect_retries=3,
    )

    mqtt_client.enable_logger(logging, log_level=logging.DEBUG)

    logger.info("Connecting to MQTT broker")
    mqtt_client.connect()

    topics = [("broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong",0)]
    logger.info(f"subscribing to {topics}")
    mqtt_client.subscribe(topics)

The broker in question is Mosquitto. Upon reception of the MQTT packet, the server reports this in the log:

1700515215: New connection from 10.0.1.61:55671 on port 1883.
1700515215: New client connected from 10.0.1.61:55671 as cpy13850 (p2, c1, k60).
1700515215: Client cpy13850 disconnected due to malformed packet.

and looking at the traffic dump with Wireshark, the MQTT protocol dissector seems to agree that the SUBSCRIBE packet was malformed.

vladak commented 10 months ago

The problem is in the remaining length bytes. In this case the length is 130 in decimal which is 0x80 hexadecimal, however according to the encoding scheme (MQTT 3.1.1 section 2.2.3 Remaining Length) if the top level bit is set (which it is for 0x80), the next byte contains continuation. The subscribe() however uses only single byte: https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT/blob/926846c45664c9155a67a11395387e5b53159be4/adafruit_minimqtt/adafruit_minimqtt.py#L815-L819 so this is clearly a bug.

LelandSindt commented 7 months ago

Made time to update and test today... working as expected, thank you!