eclipse / paho.mqtt.python

paho.mqtt.python
Other
2.12k stars 722 forks source link

Reinitialize API Missed callback_api_version Argument #850

Open lizaibeim opened 3 weeks ago

lizaibeim commented 3 weeks ago

Hi, I recently met a bug when I tried to reinitialize the mqtt client to reuse the client.

import configparser
import os.path

import paho.mqtt.client as mqtt

class MQTTClientWrapper:
    """Wrapper class for the MQTT client connected to predefined host and port"""

    def __init__(self, user_data=None, on_message=None, topic=None):
        """Initialize the MQTT client from the configuration file"""
        config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../conf', 'audio_base.ini')
        self.subscribed_topic = None
        self.config = configparser.ConfigParser()
        self.config.read(config_path)
        self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
        self.initialize(user_data, on_message, topic)

    def reinit(self, user_data=None, on_message=None, topic=None):
        """Reinitialize the client with new user data, on_message callback, and topic"""
        self.subscribed_topic = None
        self.client.reinitialise()
        self.initialize(user_data, on_message, topic)

    def initialize(self, user_data=None, on_message=None, topic=None):
        self.client.connect(host=self.config['MQTT']['mqtt_host'], port=int(self.config['MQTT']['mqtt_port']),
                            keepalive=60)
        if user_data:
            self.user_data_set(user_data)
        if on_message:
            self.on_message(on_message)
        if topic:
            self.subscribe(topic)
            self.subscribed_topic = topic

    def user_data_set(self, user_data):
        """Set the user data for the client"""
        self.client.user_data_set(user_data)

    def subscribe(self, topic):
        """Unsubscribe from the current topic and subscribe to a new topic"""
        if self.subscribed_topic:
            self.client.unsubscribe(self.subscribed_topic)
        self.client.subscribe(topic)
        self.subscribed_topic = topic

    def publish(self, topic, message, retain=False, qos=0):
        """Publish a message to a topic"""
        self.client.publish(topic, message, retain=retain, qos=qos)

    def on_message(self, on_message):
        """Set the on_message callback function"""
        self.client.on_message = on_message

    def start(self):
        """Start the network loop in a separate thread"""
        self.client.loop_start()

    def stop(self):
        """Stop the network loop"""
        self.client.loop_stop()

The errors shown

self.mqtt_client.reinit()
  File "/Users/ericli/mbox-audio/utils/mqtt_client.py", line 22, in reinit
    self.client.reinitialise()
  File "/Users/ericli/miniforge3/envs/audio-base/lib/python3.10/site-packages/paho/mqtt/client.py", line 1150, in reinitialise
    self.__init__(client_id, clean_session, userdata)  # type: ignore[misc]
  File "/Users/ericli/miniforge3/envs/audio-base/lib/python3.10/site-packages/paho/mqtt/client.py", line 766, in __init__
    raise ValueError(
ValueError: Unsupported callback API version: version 2.0 added a callback_api_version, see migrations.md for details

I check the code of mqtt/client.py

def reinitialise(
        self,
        client_id: str = "",
        clean_session: bool = True,
        userdata: Any = None,
    ) -> None:
        self._reset_sockets()

        self.__init__(client_id, clean_session, userdata)  # type: ignore[misc]

Here, the function call of reinitialise missed the callback_api_version parameter needed in the init function

def __init__(
        self,
        callback_api_version: CallbackAPIVersion,
        client_id: str = "",
        clean_session: bool | None = None,
        userdata: Any = None,
        protocol: int = MQTTv311,
        transport: Literal["tcp", "websockets"] = "tcp",
        reconnect_on_failure: bool = True,
        manual_ack: bool = False,
    ) -> None: