peterhinch / micropython-mqtt

A 'resilient' asynchronous MQTT driver. Recovers from WiFi and broker outages.
MIT License
563 stars 121 forks source link

MQTTv5 testing #149

Open peterhinch opened 3 weeks ago

peterhinch commented 3 weeks ago

@bobveringa As discussed I'm writing a demo script for this. I'm also enabling installation with mpremote mip. The latter installs to a lib subdirectory and there is no way to override this. A consequence is that the relative import does not work:

MicroPython v1.22.1 on 2024-01-05; Generic ESP32 module with ESP32
Type "help()" for more information.
>>> 
>>> import basic
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/lib/basic.py", line 108, in <module>
  File "/lib/mqtt_as.py", line 705, in __init__
  File "/lib/mqtt_as.py", line 198, in __init__
ImportError: can't perform relative import

The problem is fixed by using an absolute import, but I'm stumped as to why this error is occurring. Both source files are in the same lib subdirectory. I'd be grateful for your thoughts.

Another problem is that I can't get topic aliases to work. I'm running the following Bash script to create publications:

#! /bin/bash
# mosquitto_sub -h 192.168.0.10 -t foo_topic  -V 5 -F "Properties %P payload %p"
while :
do
    mosquitto_pub -h 192.168.0.10 -t foo_topic -m "User property" -D PUBLISH user-property key value -V 5
    sleep 5
    mosquitto_pub -h 192.168.0.10 -t foo_topic -m "Content type" -D PUBLISH content-type my_type -V 5
    sleep 5
    mosquitto_pub -h 192.168.0.10 -t foo_topic -m "Correlation data" -D PUBLISH correlation-data my_data -V 5
    sleep 5
    mosquitto_pub -h 192.168.0.10 -t foo_topic -m "Response topic" -D PUBLISH response-topic my_topic -V 5
    sleep 5
    mosquitto_pub -h 192.168.0.10 -t foo_topic -m "Payload format indicator" -D PUBLISH payload-format-indicator 1 -V 5
    sleep 5
    mosquitto_pub -h 192.168.0.10 -t foo_topic -m "Message expiry interval 300s" -D PUBLISH message-expiry-interval 300 -V 5
    sleep 5
    mosquitto_pub -h 192.168.0.10 -t foo_topic -m "Topic alias 1" -q 1 -D PUBLISH topic-alias 1 -V 5
    sleep 5
    # The following does not work: not even received by mosquitto_sub
    mosquitto_pub -h 192.168.0.10 -t "" -m "Sent using alias" -D PUBLISH topic-alias 1 -V 5
    sleep 5
done

As stated in the comment, the final publication disappears into the ether - even mosquitto_sub doesn't get it. So I suspect that the problem isn't in our code but in my understanding of the spec. Please advise.

This is my current test script:

from mqtt_as import MQTTClient, RP2

from mqtt_local import config  # Sets "server", "ssid", "wifi_pw"
import asyncio
import gc

TOPIC = "shed"  # For demo publication and last will use same topic

outages = 0

def decode(key):
    names = {
        0x01: "payload format indicator",
        0x02: "Message Expiry Interval",
        0x03: "Content Type",
        0x08: "Response Topic",
        0x09: "Correlation Data",
        0x0B: "Subscription Identifier",
        0x23: "Topic alias",
        0x26: "User Property",
    }
    if key in names:
        return names[key]
    return f"Unknown: {key}"

# Incoming properties comprise a dict with one key for each property in the message
async def messages(client):
    async for topic, msg, retained, properties in client.queue:
        print(f'Topic: "{topic.decode()}" Message: "{msg.decode()}" Retained: {retained}')
        if properties is None:
            print("Message has no properties.")
        else:
            print("Message properties:")
            for key in properties:
                print(f"{decode(key)} : {properties[key]}")

async def down(client):
    global outages
    while True:
        await client.down.wait()  # Pause until connectivity changes
        client.down.clear()
        outages += 1
        print("WiFi or broker is down.")

async def up(client):
    while True:
        await client.up.wait()
        client.up.clear()
        print("We are connected to broker.")
        await client.subscribe("foo_topic", 1)

async def main(client, props):
    try:
        await client.connect(quick=True)
    except OSError:
        print("Connection failed.")
        return
    asyncio.create_task(up(client))
    asyncio.create_task(down(client))
    asyncio.create_task(messages(client))
    n = 0
    s = "{} repubs: {} outages: {} free: {}bytes discards: {}"
    while True:
        await asyncio.sleep(5)
        print("publish", n)
        gc.collect()
        m = gc.mem_free()
        msg = s.format(n, client.REPUB_COUNT, outages, m, client.queue.discards)
        # If WiFi is down the following will pause for the duration.
        await client.publish(TOPIC, msg, qos=1, properties=props)
        n += 1

# Define configuration
config["will"] = (TOPIC, "Goodbye cruel world!", False, 0)
config["keepalive"] = 120
config["queue_len"] = 4  # Use event interface
config["mqttv5"] = True
config["mqttv5_con_props"] = {
    0x11: 3600,  # Session Expiry Interval
    0x22: 45,  # Max value of topic alias
}

# Set up client. Enable optional debug statements.
MQTTClient.DEBUG = True
client = MQTTClient(config)
# Properties for publication
pub_props = {
    0x26: {"value": "test"},  # User Property (UTF-8 string pair)
    0x09: b"correlation_data",  # Correlation Data (binary)
    0x08: "response_topic",  # Response Topic (UTF-8 string)
    0x02: 60,  # Message Expiry Interval (integer)
}

try:
    asyncio.run(main(client, pub_props))
finally:
    client.close()
    asyncio.new_event_loop()

[EDIT] Apart from the import line, there are no code changes.

bobveringa commented 3 weeks ago

The latter installs to a lib subdirectory and there is no way to override this. A consequence is that the relative import does not work

That is interesting. I used the relative import because we store the library in the following path lib/mqtt_as/mqtt_as.py, and it did not work unless I used a relative import. [EDIT]: We just use this with our own file structure, we are not using mip

As for the test script, the client does not support topic aliases for received messages, only for sending messages. It seems like I misread the spec and assumed that topic aliases are only for sending to the broker. This is not the case, and should probably be added as a limitation in the docs.

My initial assumption was, topic aliases for sending to the broker, subscription identifiers for receiving. But this appears to be wrong. After reading the spec more carefully, it is not clear to me how sending topic aliases to clients is supposed to work. This is clearly something that requires more investigation. I am a bit busy this week, but might have some time next week to look into this.

It is unclear to me why the bash script also doesn't work with mosquito, unless mosquito_sub is maybe configured to support topic aliases when it doesn't?

peterhinch commented 3 weeks ago

I take your points re aliases. At the moment I am decidedly confused. I await your guidance when you have time.

Of more concern is that the above Python script seems to be unreliable, even in the absence of incoming messages. I sometimes get a disconnect after every publish:

>>> import basic
Connecting to broker.
CONNACK properties: {34: 10, 33: 20}
We are connected to broker.
publish 0
WiFi or broker is down.
Checking WiFi integrity.
Got reliable connection
Connecting to broker.
CONNACK properties: {34: 10, 33: 20}
Reconnect OK!
We are connected to broker.
WiFi or broker is down.

This fault goes away if I set the publish qos=0.

Further, the existing V3.1.1 range_ex demo, which publishes with qos=, works fine:

Checking WiFi integrity.
Got reliable connection
Connecting to broker.
We are connected to broker.
publish 0
publish 1
publish 2
RAM free 126944 alloc 25952
publish 3
publish 4
publish 5
...

The fault is intermittent. This morning I thought I'd fixed the problem, removing some code that I believed was the cause, but now the same code is failing with qos=1.

When you get time you might like to try this script - or tell me what I'm doing wrong :)

bobveringa commented 3 weeks ago

This random disconnect is interesting. It seems similar to behaviour we have been tracking on some devices, I heard my colleague talking about it today. Last I heard, it appears that the _sta_if.isconnected() is just returning False at some point for seemingly no reason. But we are still in the early stages of tracing this.

As for the script.

config["mqttv5_con_props"] = {
    0x11: 3600,  # Session Expiry Interval
    0x22: 45,  # Max value of topic alias
}

After further reading the spec, I think my initial assessment was correct, topic are only for sending to the broker, not for receiving. As far I can tell, all setting the Topic Alias Maximum does is ensure that if it is a lower number than what the broker supports, CONNACK returns the lower number for the max. I think for pretty much all uses cases the broker will be the limiting factor for the number of topic aliases not the client. For example, AWS IoT has it limited to 8 aliases.

For your configuration, what does client.topic_alias_maximum return if you omit the Topic Alias Maximum (0x22) property from the connection properties? If this is 0 it would explain why aliases don't appear to work.

peterhinch commented 3 weeks ago

topic are only for sending to the broker, not for receiving

I've evidently misunderstood this, but what you are saying does make sense. I'll amend the docs so that this is clear.

I removed the setting of "Topic Alias Maximum". The value of client.topic_alias_maximum is 10, which is in accordance with this debug print

CONNACK properties: {34: 10, 33: 20}

If this is 0 it would explain why aliases don't appear to work.

I haven't yet tested outgoing (publication) aliases.

The disconnection bug.

I'll try to investigate this. My test setup is simple. I'm using the original ESP32 reference board running V1.22.1 firmware. There is no other MQTT traffic on the LAN (the bash script is not running). I've now set properties=None:

        await client.publish(TOPIC, msg, qos=1, properties=None)

This does not affect the problem.

I suspect the problem may lie here. Between the decode of the PID and its removal from self.rcv_pids the code yields to the scheduler. I haven't fully thought this through but it may be creating a hazard.

Being of an experimental persuasion I'll move the code, re-test, then - if it fixes it - apply my ageing grey matter to trying to figure out the reason

peterhinch commented 3 weeks ago

It's not as I thought above. I've made some progress with this instrumented code:

        if op == 0x40:  # PUBACK: save pid
            sz, _ = await self._recv_len()
            if not self.mqttv5:
                if sz != 2:
                    raise OSError(-1, "Invalid PUBACK packet")
            rcv_pid = await self._as_read(2)
            pid = rcv_pid[0] << 8 | rcv_pid[1]
            if pid in self.rcv_pids:
                self.rcv_pids.discard(pid)
            else:
                raise OSError(-1, "Invalid pid in PUBACK packet")
            print("sz", sz)
            # For some reason even on MQTTv5 reason code is optional
            if sz != 2:
                reason_code = await self._as_read(1)
                reason_code = reason_code[0]
                if reason_code >= 0x80:
                    raise OSError(-1, "PUBACK reason code 0x%x" % reason_code)

            if sz > 2:
                print("Got here")
                puback_props_sz, _ = await self._recv_len()
                print("puback_props_sz", puback_props_sz)
                if puback_props_sz > 0:
                    puback_props = await self._as_read(puback_props_sz)
                    decoded_props = decode_properties(puback_props, puback_props_sz)
                    self.dprint("PUBACK properties %s", decoded_props)

When a PUBACK arrives, sz is 3. So it prints "Got here" and executes

puback_props_sz, _ = await self._recv_len()

This never returns. The line

print("puback_props_sz", puback_props_sz)

never runs, consequently wait_msg is stalled and the system declares an outage.

So I think there is a bug in ._recv_len - I'll try to figure it out but any thoughts would be welcome!

peterhinch commented 3 weeks ago

Looking at the spec I think it's simpler than that. A PUBACK packet has three options.

I think the code should read:

        if op == 0x40:  # PUBACK: save pid
            sz, _ = await self._recv_len()
            if not self.mqttv5:
                if sz != 2:
                    raise OSError(-1, "Invalid PUBACK packet")
            rcv_pid = await self._as_read(2)
            pid = rcv_pid[0] << 8 | rcv_pid[1]
            if pid in self.rcv_pids:
                self.rcv_pids.discard(pid)
            else:
                raise OSError(-1, "Invalid pid in PUBACK packet")
            # For some reason even on MQTTv5 reason code is optional
            if sz != 2:
                reason_code = await self._as_read(1)
                reason_code = reason_code[0]
                if reason_code >= 0x80:
                    raise OSError(-1, "PUBACK reason code 0x%x" % reason_code)
            if sz > 3:
                puback_props_sz, _ = await self._recv_len()
                if puback_props_sz > 0:
                    puback_props = await self._as_read(puback_props_sz)
                    decoded_props = decode_properties(puback_props, puback_props_sz)
                    self.dprint("PUBACK properties %s", decoded_props)

On a brief test this seems to fix the problem.

I would welcome your comments. (I still think it's best to handle the PID before yielding to the scheduler).

bobveringa commented 3 weeks ago

Going over the specs again, this seems like the correct solution.

While I think that it is indeed better to handle the PID before yielding to the scheduler. I am wondering what the implications of this are. In this case, we would "clear" the PID even if the reason code specifies an error. If the PID has been handled, then there will be no further retries.

peterhinch commented 3 weeks ago

Fair point. I'll put it back.

I've run a test with the above version, publishing at qos==1 with properties, and in the presence of incoming messages. It's got to over 1K publications without error so I think the problem is solved.

peterhinch commented 3 weeks ago

To return to the issue of relative imports, Python only accepts these in a Python package. By installing to lib/mqtt_as/ you are effectively creating a package, but one which breaks existing code. For existing code to run unchanged we need to rename mqtt_as.py to __init__.py. I can arrange for MIP install to do this and to create the directory. Demo scripts and users' applications will continue to issue

from mqtt_as import MQTTClient

Users' applications will run unchanged but demo scripts would now be run with (e.g.)

import mqtt_as.range

[EDIT] I have now done this and re-organised the files. There is only one README.md, the Bash scripts are in the root directory, and the library with some of the demos is mip installable. Code is unchanged apart from the bugfix we discussed.

I have updated the README significantly adding V5 content in the hope of making it more beginner friendly. I'd be grateful if you could review this whenever you get time.

bobveringa commented 3 weeks ago

This summary looks like it would be helpful to most people who want to get started with MQTTv5. The only thing I am not 100% sure of is topic aliases being listed under "Incoming properties". But I suppose it is technically possible because it is a property of the publish packet.

peterhinch commented 3 weeks ago

You are right - testing shows that a message published with a topic alias property is received with no properties. I have updated the doc. Thanks for the review.