adafruit / Adafruit_CircuitPython_MiniMQTT

MQTT Client Library for CircuitPython
Other
72 stars 50 forks source link

enforce loop timeout to be bigger than socket timeout #200

Closed vladak closed 5 months ago

vladak commented 5 months ago

The timeout argument of loop() depends on the socket timeout. With some extra debug logging:

diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py
index 2ec0100..98f68cf 100644
--- a/adafruit_minimqtt/adafruit_minimqtt.py
+++ b/adafruit_minimqtt/adafruit_minimqtt.py
@@ -1074,12 +1074,16 @@ class MQTT:
             try:
                 res = self._sock_exact_recv(1)
             except self._socket_pool.timeout:
+                self.logger.debug(f"socket timeout [1]: {self.get_monotonic_time()}")
                 return None
         else:  # socketpool, esp32spi
             try:
                 res = self._sock_exact_recv(1)
             except OSError as error:
                 if error.errno in (errno.ETIMEDOUT, errno.EAGAIN):
+                    self.logger.debug(
+                        f"socket timeout [2]: {self.get_monotonic_time()}"
+                    )
                     # raised by a socket timeout if 0 bytes were present
                     return None
                 raise MMQTTException from error
@@ -1164,7 +1168,9 @@ class MQTT:
             # CPython/Socketpool Impl.
             rc = bytearray(bufsize)
             mv = memoryview(rc)
+            self.logger.debug(f"-> recv_into: {self.get_monotonic_time()}")
             recv_len = self._sock.recv_into(rc, bufsize)
+            self.logger.debug(f"{recv_len} <- recv_into: {self.get_monotonic_time()}")
             to_read = bufsize - recv_len
             if to_read < 0:
                 raise MMQTTException(f"negative number of bytes to read: {to_read}")
@@ -1180,6 +1186,7 @@ class MQTT:
                     )
         else:  # ESP32SPI Impl.
             # This will timeout with socket timeout (not keepalive timeout)
+            self.logger.debug(f"-> recv: {self.get_monotonic_time()}")
             rc = self._sock.recv(bufsize)
             if not rc:
                 self.logger.debug("_sock_exact_recv timeout")

The following code:

#!/usr/bin/env python3

import adafruit_minimqtt.adafruit_minimqtt as MQTT
import ssl
import adafruit_logging as logging
import socketpool
import wifi

try:
    from secrets import secrets
except ImportError:
    print(
        "WiFi credentials are kept in secrets.py, please add them there!"
    )
    raise

def connect_hook(client, user_data, result, code):
    print(f"Connect: {user_data} {result} {code}")

def main():
    # logging.basicConfig()
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)

    # Connect to Wi-Fi
    logger.info("Connecting to wifi")
    wifi.radio.connect(secrets["ssid"], secrets["password"], timeout=10)
    logger.info(f"Connected to {secrets['ssid']}")
    logger.debug(f"IP: {wifi.radio.ipv4_address}")

    # Create a socket pool
    pool = socketpool.SocketPool(wifi.radio)  # pylint: disable=no-member

    broker = "172.40.0.3"
    port = 1883

    mqtt_client = MQTT.MQTT(
        broker=broker,
        port=port,
        socket_pool=pool,
        ssl_context=ssl.create_default_context(),
    socket_timeout=3,
    )

    mqtt_client.logger = logger

    mqtt_client.on_connect = connect_hook
    mqtt_client.connect()

    while True:
        try:
            logger.debug("loop")
            mqtt_client.loop(0.5)
        except UnicodeError:
            pass
        except OSError as e:
            if e.errno == errno.ETIMEDOUT:
                logger.info("got timeout")
                pass
            else:
                raise

try:
    main()
except Exception as e:
    print(f"Got exception: {e}")

running on Adafruit CircuitPython 8.0.0-beta.4 on 2022-10-30; Adafruit QT Py ESP32S2 with ESP32S2 produces this output:

4076.899: INFO - Connecting to wifi
4081.922: INFO - Connected to XXX
4081.924: DEBUG - IP: 172.40.0.20
4081.929: DEBUG - Attempting to connect to MQTT broker (attempt #0)
4081.931: DEBUG - Attempting to establish MQTT connection...
4081.934: INFO - Establishing an INSECURE connection to 172.40.0.3:1883
4084.936: DEBUG - Sending CONNECT to broker...
4084.938: DEBUG - Fixed Header: bytearray(b'\x10\x13')
4084.940: DEBUG - Variable Header: bytearray(b'\x00\x04MQTT\x04\x02\x00<')
4084.946: DEBUG - Receiving CONNACK packet from broker
4084.950: DEBUG - -> recv_into: 4084.95
4084.954: DEBUG - 1 <- recv_into: 4084.95
4084.958: DEBUG - Got message type: 0x20 pkt: 0x20
4084.960: DEBUG - -> recv_into: 4084.96
4084.963: DEBUG - 3 <- recv_into: 4084.96
Connect: None 0 0
4084.967: DEBUG - Resetting reconnect backoff
4084.968: DEBUG - loop
4084.971: DEBUG - waiting for messages for 0.5 seconds
4084.974: DEBUG - -> recv_into: 4084.97
4087.976: DEBUG - socket timeout [2]: 4087.98
4087.979: DEBUG - Loop timed out after 0.5 seconds
4087.979: DEBUG - loop
4087.982: DEBUG - waiting for messages for 0.5 seconds
4087.985: DEBUG - -> recv_into: 4087.99
4090.987: DEBUG - socket timeout [2]: 4090.99
4090.989: DEBUG - Loop timed out after 0.5 seconds
4090.991: DEBUG - loop
4090.993: DEBUG - waiting for messages for 0.5 seconds
4090.997: DEBUG - -> recv_into: 4091.0
4094.000: DEBUG - socket timeout [2]: 4094.0
4094.002: DEBUG - Loop timed out after 0.5 seconds
...

making this problem evident - the wait_for_msg() will return only after socket timeout which is bigger than the loop timeout, thus breaking the promise of loop() semantics when there is no traffic.