adafruit / Adafruit_CircuitPython_MiniMQTT

MQTT Client Library for CircuitPython
Other
79 stars 50 forks source link

Publish fails on large MQTT message sizes #31

Closed seantibor closed 4 years ago

seantibor commented 4 years ago

When the length of the message and topic is longer than 125, an IndexError is thrown on the pkt bytearray.

test code: for i in range(500): print("publishing string of length {}".format(i)) client.publish("testing/message", "a" * i)

result: Published to testing/message with PID 0 6351.95: DEBUG - Sending PUBACK publishing string of length 111 Traceback (most recent call last): File "code.py", line 121, in <module> File "adafruit_minimqtt.py", line 424, in publish IndexError: bytearray index out of range

This looks like it could be similar to issue #3 in that the payload is unintentionally limited to 127 characters in size.

seantibor commented 4 years ago

@brentru - Digging into the umqtt.simple library from the micropython implementation shows that they use a 4-byte packet for the control byte and to communicate msg length where the adafruit library uses a 2-byte packet.

from umqtt.simple line 110: bytearray(b"\x30\0\0\0")

corresponding line 65 from adafruit_minimqtt.py MQTT_PUB = bytearray(b"\x30\0")

updating line 65 to match the umqtt implementation failed when sending the packet over the socket connection

23462.7: DEBUG - Sending PUBLISH Topic: homeassistant/sensor/thermometer1/config Msg: b'{"dev_clas": "temperature", "stat_t": "homeassistant/sensor/thermometer1/state", "name": "thermometer1", "unit_of_meas": "\xc2\xb0F"}' QoS: 0 Retain? False Traceback (most recent call last): File "code.py", line 127, in <module> File "/lib/adafruit_minimqtt.py", line 432, in publish File "adafruit_esp32spi/adafruit_esp32spi_socket.py", line 100, in send File "adafruit_esp32spi/adafruit_esp32spi.py", line 711, in socket_write RuntimeError: Failed to send 4 bytes (sent 0)

brentru commented 4 years ago

@seantibor Could you provide an example of the code you're using? The JSON data doesn't look like it's greater than the MQTT_MSG_MAX_SZ

seantibor commented 4 years ago

Sure. Here is the primary code file and the class that I created for my pool thermometer.

code.py:

import board
import busio
from digitalio import DigitalInOut
from adafruit_esp32spi import adafruit_esp32spi
from adafruit_esp32spi import adafruit_esp32spi_wifimanager
import adafruit_esp32spi.adafruit_esp32spi_socket as socket
from adafruit_onewire.bus import OneWireBus
from thermometers import PoolThermometer
import time

ow_bus = OneWireBus(board.D4)

devices = ow_bus.scan()

for device in devices:
    print("ROM = {} \tFamily = 0x{:02x}".format([hex(i) for i in device.rom], device.family_code))

import adafruit_minimqtt as MQTT

### WiFi ###

# Get wifi details and more from a secrets.py file
try:
    from secrets import secrets
except ImportError:
    print("WiFi secrets are kept in secrets.py, please add them there!")
    raise

# If you are using a board with pre-defined ESP32 Pins:
# esp32_cs = DigitalInOut(board.ESP_CS)
# esp32_ready = DigitalInOut(board.ESP_BUSY)
# esp32_reset = DigitalInOut(board.ESP_RESET)

# If you have an externally connected ESP32:
esp32_cs = DigitalInOut(board.D13)
esp32_ready = DigitalInOut(board.D11)
esp32_reset = DigitalInOut(board.D12)

spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
"""Use below for Most Boards"""

import adafruit_rgbled
from adafruit_esp32spi import PWMOut
RED_LED = PWMOut.PWMOut(esp, 26)
GREEN_LED = PWMOut.PWMOut(esp, 27)
BLUE_LED = PWMOut.PWMOut(esp, 25)
status_light = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)

wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)

### Topic Setup ###

# MQTT Topic
# Use this topic if you'd like to connect to a standard MQTT broker
mqtt_topic = "pool_controller/message"

mqtt_prefix = "homeassistant"

# Adafruit IO-style Topic
# Use this topic if you'd like to connect to io.adafruit.com
# mqtt_topic = 'aio_user/feeds/temperature'

### Code ###

# Define callback methods which are called when events occur
# pylint: disable=unused-argument, redefined-outer-name
def connect(client, userdata, flags, rc):
    # This function will be called when the client is connected
    # successfully to the broker.
    print("Connected to MQTT Broker!")
    print("Flags: {0}\n RC: {1}".format(flags, rc))

def disconnect(client, userdata, rc):
    # This method is called when the client disconnects
    # from the broker.
    print("Disconnected from MQTT Broker!")

def subscribe(client, userdata, topic, granted_qos):
    # This method is called when the client subscribes to a new feed.
    print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))

def unsubscribe(client, userdata, topic, pid):
    # This method is called when the client unsubscribes from a feed.
    print("Unsubscribed from {0} with PID {1}".format(topic, pid))

def publish(client, userdata, topic, pid):
    # This method is called when the client publishes data to a feed.
    print("Published to {0} with PID {1}".format(topic, pid))

# Connect to WiFi
print("Connecting to WiFi...")
wifi.connect()
print("Connected!")

# Initialize MQTT interface with the esp interface
MQTT.set_socket(socket, esp)

# Set up a MiniMQTT Client
client = MQTT.MQTT(
    broker=secrets["mqtt_broker"], username=secrets["mqtt_username"], password=secrets["mqtt_password"], is_ssl=False, log=True)
client.set_logger_level("DEBUG")

# Connect callback handlers to client
client.on_connect = connect
client.on_disconnect = disconnect
client.on_subscribe = subscribe
client.on_unsubscribe = unsubscribe
client.on_publish = publish

client.connect()

# for i in range(500):
#     print("publishing string of length {}".format(i))
#     client.publish("testing/message", "a" * i)

thermometers = [PoolThermometer(ow_bus, device, "thermometer{}".format(num)) for num, device in enumerate(devices)]
for thermometer in thermometers:
    print("Publishing config for {}".format(thermometer.name))
    print(thermometer.config_payload)
    client.publish(thermometer.config_topic, thermometer.config_payload)

while True:
    try:
        client.loop()
    except (ValueError, RuntimeError) as e:
        print("Failed to get data, retrying\n", e)
        wifi.reset()
        client.reconnect()
        continue
    for thermometer in thermometers:
        print("Publishing state for {}".format(thermometer.name))
        print(thermometer.state_payload)
        client.publish(thermometer.state_topic, thermometer.state_payload)
    time.sleep(5)

thermometers.py:

import board
from adafruit_onewire.bus import OneWireBus
import adafruit_ds18x20
import json

class PoolThermometer(adafruit_ds18x20.DS18X20):

    def __init__(self, bus, address, name, discovery_prefix="homeassistant", unit_of_measure="°F"):
        self.name = name
        self.component = "sensor"
        self.device_class = "temperature"
        self.unit_of_measure = unit_of_measure
        self.discovery_prefix = discovery_prefix
        super().__init__(bus, address)

    @property
    def topic(self):
        return "{discovery_prefix}/{component}/{name}".format(discovery_prefix=self.discovery_prefix,component=self.component, name=self.name)

    @property
    def config_topic(self):
        return self.topic + "/config"

    @property
    def state_topic(self):
        return self.topic + "/state"

    @property
    def config_payload(self):
        payload = {"dev_clas" : "temperature", 
                   "name" : self.name, 
                   "stat_t" : self.state_topic, 
                   "unit_of_meas" : self.unit_of_measure }

        return json.dumps(payload)

    @property
    def state_payload(self):
        temperature = self.temperature
        if self.unit_of_measure == "F":
            temperature = temperature * 9 / 5 + 32
        return json.dumps({"temperature": temperature})
brentru commented 4 years ago

@seantibor I'm taking another look at this per @caternuson's request (and for a project I'm working on). There's a similar issue when receiving payloads >127b from a subscribed topic.

I'll need to implement the remaining length calculation described in the MQTT v3.1.1 spec (https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030) for both publish() and subscribe()

seantibor commented 4 years ago

That sounds good. I'm making another run at this with an ESP32 and micropython for a battery powered sensor project, so I'll let you know how that implementation goes.

brentru commented 4 years ago

@seantibor please do, though, likely it'll have the same exact issue (as MiniMQTT is based on MicroPython-uMQTT).

brentru commented 4 years ago

This issue is addressed in this PR: https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT/pull/42

I'll need to implement the remaining length calculation described in the MQTT v3.1.1 spec (https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030) for both publish() and subscribe()

After testing, topic subscriptions seem to be able to receive > 127 byte payloads (msg+topic).

brentru commented 4 years ago

Fixed in #42, released in v3.2.0 - https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT/releases/tag/3.2.0