eclipse / paho.mqtt.python

paho.mqtt.python
Other
2.15k stars 724 forks source link

How to call calback on_message from class? #716

Open Bagunda opened 1 year ago

Bagunda commented 1 year ago

This not work:

MyFilie.py:

from subprocess import call
import paho.mqtt.client as mqtt
import logging

py_logger = logging.getLogger(__name__)
py_logger.setLevel(logging.INFO)
py_handler = logging.FileHandler("{}.log".format(__name__), mode='w')
py_formatter = logging.Formatter("%(name)s %(asctime)s %(levelname)s %(message)s")
py_handler.setFormatter(py_formatter)
py_logger.addHandler(py_handler)

DEBUG = True
program_name = "include_class_test"

class MyMQTTClass(mqtt.Client):
    rc_txt = {
        0: "Connection successful",
        1: "Connection refused - incorrect protocol version",
        2: "Connection refused - invalid client identifier",
        3: "Connection refused - server unavailable",
        4: "Connection refused - bad username or password",
        5: "Connection refused - not authorised",
        7: "Connection refused - Unexpected disconnection",
        100: "Connection refused - other things"
    }

    def setPlaces(self, name, host, port, user, password, topic_header, subscribe_to_topics, LWT_topic, DEBUG):
        self.BRname = name
        self.BRhost = str(host)
        self.BRport = (port)
        self.BRuser = str(user)
        self.BRpassword = str(password)
        self.BRclient_id = str(self._client_id)
        self.BRtopic_header = str(topic_header)
        self.subscribe_to_topics = subscribe_to_topics
        self.LWT_topic = str(LWT_topic)
        self.DEBUG = DEBUG
        self.connected_flag = False

    def BRinfo(self):
        py_logger.info ("Connection data: {} ({}:{}), u={}, pass={}, client_id={}, topic_header={}".format(self.BRname, self.BRhost, self.BRport, self.BRuser, self.BRpassword, self.BRclient_id, self.BRtopic_header))
        pass

    def on_disconnect(self, mqttc, userdata, rc):
        if rc != 0:
            self.connected_flag = False
            py_logger.error("Unexpected disconnection. Brocker=" + self.BRname + ", rc: " + str(rc) + " (" + self.rc_txt[rc] + ")")

    def on_connect(self, mqttc, obj, flags, rc):
        if rc == 0: # 0 - Connection successful
            self.connected_flag=True

            for topic in self.subscribe_to_topics:
                res = self.subscribe(topic)

                if res[0] == mqtt.MQTT_ERR_SUCCESS:
                    py_logger.info("Successfully subscribed to topic: " + topic[0])
                else:
                    py_logger.critical("Error! Client is not subscribed to topic " + topic)

            self.publish(self.LWT_topic, "Online", qos=0, retain=True)
        else:
            self.connected_flag = False
            py_logger.error("Brocker=" + self.BRname + ", rc: " + str(rc) + " (" + self.rc_txt[rc] + ")")
            py_logger.error("Unexpected disconnection (maybe)")

    def on_connect_fail(self, mqttc, obj):
        py_logger.error("Connect failed")

    def on_message(self, mqttc, obj, msg):
        py_logger.critical("Etogo ne doljno byt")
        py_logger.critical(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))

    def on_publish(self, mqttc, obj, mid):
        if self.DEBUG:
            py_logger.info("Message is published success. mid="+str(mid))

    def bag_pub(self, topic, payload, retain=False):
        if (self.connected_flag == True):
            topic2 = self.BRtopic_header + topic

            (rc, mid) = self.publish(topic2, payload, retain=retain)
            if self.DEBUG:
                py_logger.info ("Try to sending mqtt: Brocker={}, mid={}, t={}, msg={}".format(self.BRname, str(mid), topic2, payload))
            if (rc != 0):
                self.connected_flag = False
                py_logger.critical("Error to send mqtt. rc=" + str(rc) + ". " + str(self.rc_txt[rc]) + ". mid=" + str(mid))
        else:
            py_logger.critical ("Scipped trying send mqtt because connected_flag = False")

    def on_subscribe(self, mqttc, obj, mid, granted_qos):
        if self.DEBUG:
            py_logger.info("Successfully subscribed: Brocker=" + self.BRname + ", subscribed: mid=" + str(mid) + ", granted_qos=" + str(granted_qos))

    def on_log(self, mqttc, obj, level, string):
        # print(string)
        pass

    def bag_will_set(self, topic):
        self.will_set(self.LWT_topic, "Offline", qos=0, retain=True)

    def run2(self):
        self.will_set(self.LWT_topic, payload="Offline", qos=0, retain=True)

        self.username_pw_set(username=self.BRuser,password=self.BRpassword)

        try:
            self.connect(self.BRhost, self.BRport, 60)
        except Exception as err:
            errno, error_string = err
            py_logger.critical("Error to connect mqtt. E1: {}. E2: {}. Broker={}".format(str(errno), str(error_string), self.BRname))
        else:
            pass

        self.loop_start()

    def exit(self):
        if self.DEBUG:
            py_logger.info("Paho mqtt is stoping...")
        self.bag_pub_raw_topic(self.LWT_topic, payload="Offline", retain=True)
        self.disconnect()
        self.loop_stop()

main.py:

import MyFile
import json
import logging

py_logger = logging.getLogger(__name__)
py_logger.setLevel(logging.INFO)
py_handler = logging.FileHandler("{}.log".format(__name__), mode='w')
py_formatter = logging.Formatter("%(name)s %(asctime)s %(levelname)s %(message)s")
py_handler.setFormatter(py_formatter)
py_logger.addHandler(py_handler)

DEBUG = True
program_name = "parent_program"

mqtt_credential_file_path = "/root/mqtt_credentials.json"
my_file = open(mqtt_credential_file_path)
my_string = my_file.read()
my_file.close()
mqtt_credentials_from_file_dict = json.loads(my_string)

MQTTtopic_header = "BSR30/" + program_name + "/relays/"
subscribe_to_topics = [(MQTTtopic_header + "command/#", 0)]
LWT_topic = MQTTtopic_header

mqtt_cred_name="LocalBrocker"

LocalBrocker = MyFile.MyMQTTClass("client_id_test")

LocalBrocker.setPlaces(
    name=mqtt_cred_name,
    host=mqtt_credentials_from_file_dict.get(mqtt_cred_name).get("host"),
    port=int(mqtt_credentials_from_file_dict.get(mqtt_cred_name).get("port")),
    user=mqtt_credentials_from_file_dict.get(mqtt_cred_name).get("user"),
    password=mqtt_credentials_from_file_dict.get(mqtt_cred_name).get("password"),
    topic_header=MQTTtopic_header,
    subscribe_to_topics = subscribe_to_topics,
    LWT_topic = LWT_topic,
    DEBUG=DEBUG)

def LocalBrocker_on_message(LocalBrocker, userdata, msg):
    py_logger.info("Recieved msg: {}, topic={}, brocker={}".format(str(msg.payload), msg.topic, LocalBrocker.BRname))

LocalBrocker.on_message = LocalBrocker_on_message
LocalBrocker.BRinfo()
LocalBrocker.bag_will_set(MQTTtopic_header)
LocalBrocker.run2()

while True:
    pass
MattBrittan commented 6 months ago

Sorry for the delay in replying (and replying with a request).

I've tweaked your code a little so it runs and it seems to work as expected. Can you please provide further info to clarify?

import paho.mqtt.client as mqtt
import logging

logging.basicConfig(level=logging.DEBUG)
py_logger = logging.getLogger(__name__)
py_logger.debug("test")

DEBUG = True
program_name = "include_class_test"

class MyMQTTClass(mqtt.Client):
    rc_txt = {
        0: "Connection successful",
        1: "Connection refused - incorrect protocol version",
        2: "Connection refused - invalid client identifier",
        3: "Connection refused - server unavailable",
        4: "Connection refused - bad username or password",
        5: "Connection refused - not authorised",
        7: "Connection refused - Unexpected disconnection",
        100: "Connection refused - other things"
    }

    def setPlaces(self, name, host, port, user, password, topic_header, subscribe_to_topics, LWT_topic, DEBUG):
        self.BRname = name
        self.BRhost = str(host)
        self.BRport = (port)
        self.BRuser = str(user)
        self.BRpassword = str(password)
        self.BRclient_id = str(self._client_id)
        self.BRtopic_header = str(topic_header)
        self.subscribe_to_topics = subscribe_to_topics
        self.LWT_topic = str(LWT_topic)
        self.DEBUG = DEBUG
        self.connected_flag = False

    def BRinfo(self):
        py_logger.info ("Connection data: {} ({}:{}), u={}, pass={}, client_id={}, topic_header={}".format(self.BRname, self.BRhost, self.BRport, self.BRuser, self.BRpassword, self.BRclient_id, self.BRtopic_header))
        pass

    def on_disconnect(self, mqttc, userdata, rc):
        if rc != 0:
            self.connected_flag = False
            py_logger.error("Unexpected disconnection. Brocker=" + self.BRname + ", rc: " + str(rc) + " (" + self.rc_txt[rc] + ")")

    def on_connect(self, mqttc, obj, flags, rc):
        if rc == 0: # 0 - Connection successful
            self.connected_flag=True

            for topic in self.subscribe_to_topics:
                res = self.subscribe(topic)

                if res[0] == mqtt.MQTT_ERR_SUCCESS:
                    py_logger.info("Successfully subscribed to topic: " + topic[0])
                else:
                    py_logger.critical("Error! Client is not subscribed to topic " + topic)

            self.publish(self.LWT_topic, "Online", qos=0, retain=True)
        else:
            self.connected_flag = False
            py_logger.error("Brocker=" + self.BRname + ", rc: " + str(rc) + " (" + self.rc_txt[rc] + ")")
            py_logger.error("Unexpected disconnection (maybe)")

    def on_connect_fail(self, mqttc, obj):
        py_logger.error("Connect failed")

    def on_message(self, mqttc, obj, msg):
        py_logger.critical("Etogo ne doljno byt")
        py_logger.critical(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))

    def on_publish(self, mqttc, obj, mid):
        if self.DEBUG:
            py_logger.info("Message is published success. mid="+str(mid))

    def bag_pub(self, topic, payload, retain=False):
        if (self.connected_flag == True):
            topic2 = self.BRtopic_header + topic

            (rc, mid) = self.publish(topic2, payload, retain=retain)
            if self.DEBUG:
                py_logger.info ("Try to sending mqtt: Brocker={}, mid={}, t={}, msg={}".format(self.BRname, str(mid), topic2, payload))
            if (rc != 0):
                self.connected_flag = False
                py_logger.critical("Error to send mqtt. rc=" + str(rc) + ". " + str(self.rc_txt[rc]) + ". mid=" + str(mid))
        else:
            py_logger.critical ("Scipped trying send mqtt because connected_flag = False")

    def on_subscribe(self, mqttc, obj, mid, granted_qos):
        if self.DEBUG:
            py_logger.info("Successfully subscribed: Brocker=" + self.BRname + ", subscribed: mid=" + str(mid) + ", granted_qos=" + str(granted_qos))

    def on_log(self, mqttc, obj, level, string):
        # print(string)
        pass

    def bag_will_set(self, topic):
        self.will_set(self.LWT_topic, "Offline", qos=0, retain=True)

    def run2(self):
        self.will_set(self.LWT_topic, payload="Offline", qos=0, retain=True)

        self.username_pw_set(username=self.BRuser,password=self.BRpassword)

        try:
            self.connect(self.BRhost, self.BRport, 60)
        except Exception as err:
            errno, error_string = err
            py_logger.critical("Error to connect mqtt. E1: {}. E2: {}. Broker={}".format(str(errno), str(error_string), self.BRname))
        else:
            pass

        self.loop_start()

    def exit(self):
        if self.DEBUG:
            py_logger.info("Paho mqtt is stoping...")
        self.bag_pub_raw_topic(self.LWT_topic, payload="Offline", retain=True)
        self.disconnect()
        self.loop_stop()

py_logger = logging.getLogger(__name__)
py_logger.setLevel(logging.INFO)
py_handler = logging.FileHandler("{}.log".format(__name__), mode='w')
py_formatter = logging.Formatter("%(name)s %(asctime)s %(levelname)s %(message)s")
py_handler.setFormatter(py_formatter)
py_logger.addHandler(py_handler)

DEBUG = True
program_name = "parent_program"

MQTTtopic_header = "BSR30/" + program_name + "/relays/"
subscribe_to_topics = [(MQTTtopic_header + "command/#", 0)]
LWT_topic = MQTTtopic_header
print(subscribe_to_topics)

mqtt_cred_name="LocalBrocker"

LocalBrocker = MyMQTTClass("client_id_test")
LocalBrocker.setPlaces(
    name=mqtt_cred_name,
    host='mosquitto',
    port=1883,
    user=None,
    password=None,
    topic_header=MQTTtopic_header,
    subscribe_to_topics=subscribe_to_topics,
    LWT_topic=LWT_topic,
    DEBUG=DEBUG)

def LocalBrocker_on_message(LocalBrocker, userdata, msg):
    py_logger.info("Recieved msg: {}, topic={}, brocker={}".format(str(msg.payload), msg.topic, LocalBrocker.BRname))

LocalBrocker.on_message = LocalBrocker_on_message
LocalBrocker.BRinfo()
LocalBrocker.bag_will_set(MQTTtopic_header)
LocalBrocker.run2()

while True:
    pass

Output:

pythondocker-1  | DEBUG:__main__:test
pythondocker-1  | [('BSR30/parent_program/relays/command/#', 0)]
pythondocker-1  | INFO:__main__:Connection data: LocalBrocker (mosquitto:1883), u=None, pass=None, client_id=b'client_id_test', topic_header=BSR30/parent_program/relays/
pythondocker-1  | INFO:__main__:Successfully subscribed to topic: BSR30/parent_program/relays/command/#
pythondocker-1  | INFO:__main__:Message is published success. mid=2
pythondocker-1  | INFO:__main__:Successfully subscribed: Brocker=LocalBrocker, subscribed: mid=1, granted_qos=(0,)
pythondocker-1  | INFO:__main__:Recieved msg: b'foo', topic=BSR30/parent_program/relays/command/x, brocker=LocalBrocker

As you will note the LocalBrocker_on_message is called,