eclipse-paho / paho.mqtt.python

paho.mqtt.python
Other
2.22k stars 725 forks source link

Cannot store received UserProperty in variable #725

Open BlackRose01 opened 1 year ago

BlackRose01 commented 1 year ago

Hello,

I wrote a class which should test MQTT Brokers. The method "check_userprops" checks if it is possible to send and receive User Properties. I can see the message incl. the Properties in my MQTT Explorer (MQTTx). So this works but the problem is that I cannot store the received Properties in a variable. As soon as I try to store the information then the Property "disappears" from the message object from the method "__check_userprops" (on_message) and I don't know why. Even when I change the on_message method to a class method it is not possible to store the information. When I switch from a class to a non-class code (Demo Code) then everything works fine.

Do I have a thinking problem in my code or am I understanding something completely wrong?

Thanks in advance!

KR, BlackRose01

Configuration object for My Code

{
    "name": "Test MQTT v5 non secure",
    "host": "localhost",
    "port": 1883,
    "transport": "mqtt",
    "version": "5",
    "credentials": False,
    "certificate": False
}

My Code

from .IMqtt import IMqtt
import paho.mqtt.client as mqtt
from paho.mqtt.client import MQTTMessage, SubscribeOptions
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
import ssl
import importlib.metadata
from datetime import datetime, timedelta
import logging
import copy

class Paho(IMqtt):
    def __init__(self, connection_information: dict, timeout: float = 20.0, test_topic: str = "test", test_msg: str = "test message") -> None:
        self.config = connection_information
        self.logger = logging.getLogger("MQTT Tester")
        self.timeout = 20.0 if timeout <= 0 else timeout
        self.test_topic = test_topic if test_topic.startswith("/") is False else test_topic[1:]
        self.test_msg = test_msg if test_msg != "" and test_msg is not None else "test message"
        self.status = None
        self.client = self.__connect()

    def __connect(self, counter: str = "", set_will: bool = False) -> mqtt.Client:
        protocol = mqtt.MQTTv5 if self.config['version'] == "5" else mqtt.MQTTv311 if self.config['version'] == "311" else mqtt.MQTTv31
        transport = "tcp" if self.config['transport'].lower() == "mqtt" else "websockets"
        client_id = "testconnection_paho_{}{}".format("ws" if transport == "websockets" else "mqttv" + self.config['version'], "" if counter == "" else "_" + counter)

        client = mqtt.Client(client_id=client_id, protocol=protocol, transport=transport)
        client.client_id = client_id
        client.msg_status = None

        if transport == "websockets":
            client.ws_set_options(path="/mqtt")

        if set_will:
            client.will_set(topic=self.test_topic, payload=self.test_msg, retain=True)

        if self.config['credentials'] is not False:
            client.username_pw_set(
                username=self.config['credentials']['username'], password=self.config['credentials']['password'])

        if self.config['certificate'] is not False:
            client.tls_set(ca_certs=self.config['certificate']['ca'])
            client.tls_insecure_set(True)
            c = client.connect(host=self.config['host'], port=self.config['port'])
        else:
            c = client.connect(host=self.config['host'], port=int(self.config['port']))

        client.loop_start()
        return client

    def __cleanup(self) -> None:
        self.status = None
        self.client.msg_status = None

        self.client.on_connect = None
        self.client.on_connect_fail = None
        self.client.on_disconnect = None
        self.client.on_message = None
        self.client.on_publish = None
        self.client.on_subscribe = None
        self.client.on_unsubscribe = None

    def disconnect(self) -> None:
        self.client.loop_stop()
        self.client.disconnect()

    def __check_userprops(self, client, userdata, msg: MQTTMessage) -> None:
        print(msg.properties.json())
        client.msg_status = (str(msg.payload.decode("utf-8")), msg.properties.json())
        print(client.msg_status)

    def check_userprops(self) -> tuple:
        self.__cleanup()

        if self.is_mqtt5() is False:
            return (False, "Ignored. Client is not MQTTv5")

        properties_pub = Properties(packetType=PacketTypes.PUBLISH)
        properties_pub.UserProperty = [("A", "B")]

        try:
            res, mid = self.client.subscribe(topic=self.test_topic)
            self.client.on_message = self.__check_userprops

            if res != mqtt.MQTT_ERR_SUCCESS:
                return (False, mqtt.error_string(res))

            msg = self.client.publish(
                topic=self.test_topic, payload=self.test_msg, properties=properties_pub)
            msg.wait_for_publish(timeout=self.timeout)
            start = datetime.now()

            while self.client.msg_status is None and (datetime.now() - start) <= timedelta(seconds=self.timeout):
                pass

            print("Data: " + str(self.client.msg_status))
            res, mid = self.client.unsubscribe(topic=self.test_topic)

            if res != mqtt.MQTT_ERR_SUCCESS:
                return (False, mqtt.error_string(res))
            if self.client.msg_status is None:
                return (False, "No message received")
            elif self.client.msg_status[0] != self.test_msg:
                return (False, "Received message does not correspond to published message")
            elif not self.client.msg_status[1]:
                return (False, "Did not receive any UserProperty")
            elif ("A", "B") in self.client.msg_status[1].properties.items():
                return (False, "The received UserProperty does not cooespond to published UserProperty")

            return (True,)
        except (ValueError, RuntimeError) as e:
            return (False, e)

Demo Code

#!C:/python36/python.exe
#!/usr/bin/env python3
##demo code provided by Steve Cope at www.steves-internet-guide.com
##email steve@steves-internet-guide.com
##Free to use for any purpose
##If you like and use this code you can
##buy me a drink here https://www.paypal.me/StepenCope

import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
import time
import logging
import sys

client_id = "testclient2"
mqttv = mqtt.MQTTv5
messages = []
host = 'localhost'
port = 1883
pub_topic = "test"
x = None

def on_publish(client, userdata, mid):
    print("published")

def on_connect(client, userdata, flags, reasonCode, properties=None):
    print('Connected ', flags)
    print('Connected properties', properties)
    print('Connected ', reasonCode)

def on_message(client, userdata, message):
    global x
    msg = str(message.payload.decode("utf-8"))
    messages.append(msg)
    print("correlation=", message.properties)
    print('RECV Topic = ', message.topic)
    print('RECV MSG =', msg)
    print("properties received= ", message.properties)
    user_properties = message.properties.json()
    x = message.properties.json()
    print("user properties received= ", user_properties)

def on_disconnect(client, userdata, rc):
    print('Received Disconnect ', rc)

def on_subscribe(client, userdata, mid, granted_qos, properties=None):
    print('SUBSCRIBED')

def on_unsubscribe(client, userdata, mid, properties, reasonCodes):
    print('UNSUBSCRIBED')

print("creating client")

client_sub = mqtt.Client("subclient", protocol=mqttv)
client_pub = mqtt.Client("pubclient", protocol=mqttv)

client_sub.on_connect = on_connect
client_pub.on_connect = on_connect
client_sub.on_message = on_message
client_sub.on_disconnect = on_disconnect
client_sub.on_subscribe = on_subscribe
client_pub.on_publish = on_publish

client_sub.connect(host)
client_sub.loop_start()
client_sub.subscribe('test/#', qos=0)
client_pub.connect(host)
#while(not client_sub.is_connected() and not client_sub.is_connected())
print("waiting for connection")
time.sleep(5)
print("connected")
print("sending message user properties set")
properties = Properties(PacketTypes.PUBLISH)
count = "1"
properties.UserProperty = [("filename", "test.txt"), ("count", count)]
client_pub.publish("test/mqtt", "test message", properties=properties)

time.sleep(5)
client_sub.disconnect()
client_pub.disconnect()

print("HI")
print(x)
MattBrittan commented 10 months ago

Apologies for the delay!

As soon as I try to store the information then the Property "disappears" from the message object from the method "__check_userprops" (on_message) and I don't know why.

Sorry - I'm having a little difficulty following this. Is there any chance you could provide a minimal, reproducible, example (your code is close; if you could just tweak it so it runs the same as "Demo Code" that would be great!). The output I get from "Demo Code" is as follows (I'm assuming that this is the correct output that you expect; what's the output from the other code?):

pythondocker-1  | correlation= [UserProperty : [('filename', 'test.txt'), ('count', '1')]]
pythondocker-1  | RECV Topic =  test/mqtt
pythondocker-1  | RECV MSG = test message
pythondocker-1  | properties received=  [UserProperty : [('filename', 'test.txt'), ('count', '1')]]
pythondocker-1  | user properties received=  {'UserProperty': [('filename', 'test.txt'), ('count', '1')]}