adafruit / Adafruit_CircuitPython_MiniMQTT

MQTT Client Library for CircuitPython
Other
79 stars 50 forks source link

MQTT loop() blocks script. Works fine with adafruit_io #138

Open mrmay-dev opened 1 year ago

mrmay-dev commented 1 year ago

This happens with CircuitPython 7.3.3 and 8 beta-6 using the mpy-20221230 libraries for both 7 and 8 releases.

The loop() works flawlessly when run used with the AIO client. However, it stalls with plain MQTT.

A simple workaround is to add a timeout (in seconds) to the loop: mqtt_client.loop(timeout = 1)

I expect that while loop() is checking for updates the code below would send messages to the topic. What happens instead, is that the loop() blocks the script until it receives a message and then it allows the rest of the code to run.

To reproduce:

  1. run this on a MatrixPortal M4
  2. notice that the script sends no messages
  3. from another client send a message to test/test-topic
  4. notice that the sent message is processed and the rest of the while True loop runs.

To run the same loop with adafruit_io change the use_MQTT = True variable to False.

''' Bug Reproduction Demo - MatrixPortal M4 '''

import time
import board
import busio
from microcontroller import cpu
from digitalio import DigitalInOut
from adafruit_esp32spi import adafruit_esp32spi
from adafruit_esp32spi import adafruit_esp32spi_wifimanager
import adafruit_esp32spi.adafruit_esp32spi_socket as socket
import adafruit_minimqtt.adafruit_minimqtt as MQTT
from adafruit_io.adafruit_io import IO_MQTT

start = time.monotonic()

# -----------------------------------------------------------------------------------------
# WIFI Connection
# -----------------------------------------------------------------------------------------

try:
    from secrets import secrets
except ImportError:
    print("WiFi secrets are kept in secrets.py, please add them there!")
    raise

esp32_cs = DigitalInOut(board.ESP_CS)
esp32_ready = DigitalInOut(board.ESP_BUSY)
esp32_reset = DigitalInOut(board.ESP_RESET)

spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)

wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets)

print("Connecting to WiFi...")
wifi.connect()
print("Connected!")

# -----------------------------------------------------------------------------------------
# MQTT Connection
# -----------------------------------------------------------------------------------------

use_MQTT = True # set to False to use Adafruit IO

if use_MQTT:
    mqtt_topic = "test/test-topic"
    the_broker = "Mosquitto"

    # Initialize MQTT interface with the esp interface
    MQTT.set_socket(socket, esp)

    # Initialize a new MQTT Client object
    io = MQTT.MQTT(
        broker=secrets["mqtt_broker"],
        port=1883,
        username=secrets["mqtt_user"],
        password=secrets["mqtt_pass"],
        socket_pool=socket,
        is_ssl = False
    )

if not use_MQTT:
    mqtt_topic = "test-topic"
    the_broker = "Adafruit IO"

    # Initialize MQTT interface with the esp interface
    MQTT.set_socket(socket, esp)

    # Initialize a new MQTT Client object
    mqtt_client = MQTT.MQTT(
        broker="io.adafruit.com",
        username=secrets["aio_username"],
        password=secrets["aio_key"],
    )

    # Initialize an Adafruit IO MQTT Client
    io = IO_MQTT(mqtt_client)

def new_message(client, topic, message):
    # Method called whenever user/feeds/led has a new value
    print(f'New message on {topic}: {message}')

# Connect the callback methods defined above to Adafruit IO
io.on_message = new_message

# Connect to Adafruit IO
print(f"Connecting to {the_broker}...")
io.connect()
print(f'Time to connect: {(time.monotonic() - start):0.1f} seconds')

# -----------------------------------------------------------------------------------------
# MQTT Activity
# -----------------------------------------------------------------------------------------

print(f'\nSubscribing: {mqtt_topic}')
io.subscribe(mqtt_topic)

prv_refresh_time = 0.0
end_all = time.monotonic() + 30 # seconds
print('Listening for messages...')
while True:
    # Poll for incoming messages
    try:
        io.loop()
        # io.loop(timeout=1)  # this timeout allows the loop to continue
    except (ValueError, RuntimeError) as e:
        print("Failed to get data, retrying\n", e)
        wifi.reset()
        wifi.connect()
        io.reconnect()
        continue
    if (time.monotonic() - prv_refresh_time) > 5:
        cpu_temp = cpu.temperature
        cpu_temp = f'{cpu_temp:>0.2f}'
        print(f'Sending: {cpu_temp}')
        io.publish(mqtt_topic, cpu_temp)
        prv_refresh_time = time.monotonic()

    if time.monotonic() >= end_all:
        break

print(f'Unsubscribing: {mqtt_topic}')
io.unsubscribe(mqtt_topic)
print(f'Disconnecting from {the_broker}')
io.disconnect()
print(f'\nTotal Time: {(time.monotonic() - start):0.1f} seconds \nGood bye!')
vladak commented 1 year ago

For the record, #142 proposes to use non-zero timeout for loop() by default.

brentru commented 1 year ago

Going with #142 - leaving this open until #142 is resolved

evilworm commented 2 months ago

This might be a bit old, but I got this issue on CP 9.1.2. Went through the code and it just didn't make any sense to use timeout > 0 for non-blocking loop ... in fact I tried but it would still block. The solution for me was to patch adafruit_minimqtt.py and place self._sock.setblocking(False) just after a new socket is created self._sock = self._connection_manager.get_socket(...) (around line 494) now the loop uses default timeout=0 (and socket_timeout=-1 in mqtt init) and it does not block anymore, hope this helps someone...