thingsboard / thingsboard-python-client-sdk

ThingsBoard client Python SDK
https://thingsboard.io
Other
112 stars 68 forks source link

Python MQTT client hangs without error after a few hundred writes #47

Open ManuelAlvesDtx opened 1 year ago

ManuelAlvesDtx commented 1 year ago

I've tested Thingsboard API (HTTP using JMeter) with very good results. Now I've been asked to do the same using MQTT. I’m using the docker image described in https://thingsboard.io/docs/user-guide/install/docker/

For starters, the examples in https://thingsboard.io/docs/reference/python-client-sdk/ do not work with tb-mqtt-client latest versions. By trial and error, I managed to make it work with version 1.1, the only one that worked for sending attributes and telemetry (python -m pip install tb-mqtt-client==1.1). Using the MQTT python client to create devices never succeeds in provision more than 10.000. It hangs before that without any error. Since what I was asked to test was sending data (attributes and telemetry) I resorted to pycurl to provision the devices and get their tokens so I could send data to each device. Following the example in https://github.com/thingsboard/thingsboard-python-client-sdk/blob/master/examples/device/send_telemetry_and_attr.py I created these two functions to send data on custom MQTT class of my own where properties like server address and device token are populated when the device is provisioned using http.

    def write_device_attribute(self) -> bool:
        attribute = {"withoutHistory": random.randint(-1000000, 1000000)}
        client = TBDeviceMqttClient(host=self.server_address, token=self.device_token)
        client.connect()
        result = client.send_attributes(attribute)
        success = result.get() == TBPublishInfo.TB_ERR_SUCCESS
        client.disconnect()
        del client
        return success

    def write_device_telemetry(self) -> bool:
        telemetry = {"withHistory": random.randint(-1000000, 1000000)}
        client = TBDeviceMqttClient(host=self.server_address, token=self.device_token)
        client.connect()
        result = client.send_telemetry(telemetry,0)
        success = result.get() == TBPublishInfo.TB_ERR_SUCCESS
        client.disconnect()
        del client
        return success

These functions are called by several threads but each thread only writes to it’s own devices:
#################################################################
def write_attributes(user_n):
    """Write to the user devices attribute

    Args:
        user_n (int): integer identifying the user 1..N_USERS
    """
    try:

        for _ in range(N_DATA_POINTS):
            for n in range(N_DEVICES):
                success = MQTT_CLIENTS[user_n][n].write_device_attribute()
                Q_Request_log.put_nowait(f"{now_string()};attr;{success}")
                time.sleep(SLEEP_TIME)

    except TB_EXCEPTION as err:
        print(err)
#################################################################
def write_telemetry(user_n):
    """Write telemetry to the user devices

    Args:
        user_n (int): integer identifying the user 1..N_USERS
    """
    try:

        for _ in range(N_DATA_POINTS):
            for n in range(N_DEVICES):
                success = MQTT_CLIENTS[user_n][n].write_device_telemetry()
                Q_Request_log.put_nowait(f"{now_string()};ts;{success}")
                time.sleep(SLEEP_TIME)

    except TB_EXCEPTION as err:
        print(err)
#################################################################

And the main function: …

    # Send telemetry (attribute, no history). Launch a thread for each user.
    start_time = datetime.now()
    for i in range(N_USERS):
        thread_a = threading.Thread(target = write_attributes, args=(i,))
        thread_a.start()
        threads_At.append(thread_a)

    # wait for all threads to finish
    for thread in threads_At:
        thread.join()

    end_time = datetime.now()
    print(f"Write attribute;{elapsed_time(start_time, end_time)}")

    # Send telemetry (attribute, time series). Launch a thread for each user.
    start_time = datetime.now()
    for i in range(N_USERS):
        thread_a = threading.Thread(target = write_telemetry, args=(i,))
        thread_a.start()
        threads_TS.append(thread_a)

    # wait for all threads to finish
    for thread in threads_TS:
        thread.join()

    end_time = datetime.now()
    print(f"Send telemetry;{elapsed_time(start_time, end_time)}")

This code starts by running as expected but after a few hundred attribute writes it gets stuck. Checking the process with ps -u I see that it is waiting in a interruptible sleep (waiting for an event to complete), specifically state “Sl+”.

Any clue as why this works fine for low numbers but gets stuck on a long run? The server is almost idle at 2% CPU usage, lots of free memory and disk.

scholz commented 1 year ago

Hi, i dont know if this is helpful for your case but it may be an explaination to the behavior you describe.

We noticed in our application that if the connection goes down and we are using "success = result.get() == TBPublishInfo.TB_ERR_SUCCESS" to check the transmission (which is also documented as a blocking call) it will hang indefinitely.

Our current fix is as follows:



- remove the result.get() call

- check instead client._is_connected() to see if client is really connected, notice that if a connection is lost, the default timeout is 120s after which the _is_disconnected method is called (This can be adapted in the TBDeviceMqttClient.connect call).

I see that you are connecting / disconnecting the client in each thread so it may not be connection related. However, this situation may still happen in your case if the connection breaks right after it was established for some reason (client.connect) or maybe it never worked and then the success call simply blocks producing the reported behavior.

A more general solution would surely include to add a timeout to the result.get() fn and improve the disconnect behavior of the mqtt client. 
ManuelAlvesDtx commented 1 year ago

Hi, thanks for your reply. I've managed to get it running by using paho and making a few changes ( I was missing client.loop_start() ). Now it runs with multiple threads without any issue. I leave it here for anyone trying to do the same:

` import paho.mqtt.client as mqtt import random, json ....

def write_device_attribute(self) -> int:
    client = mqtt.Client()
    client.username_pw_set(self.device_token)
    client.connect(mqttBroker)   
    attribute = {"withoutHistory": random.randint(-1000000, 1000000)}
    attribute = json.dumps(attribute)
    client.loop_start()
    info = client.publish(ATTRIBUTES_TOPIC, attribute)
    info.wait_for_publish()
    client.disconnect()
    return info.rc

def write_device_telemetry(self) -> int:
    client = mqtt.Client()
    client.username_pw_set(self.device_token)
    client.connect(mqttBroker)
    telemetry = {"withHistory":random.randint(-1000000, 1000000)}
    telemetry = json.dumps(telemetry)
    client.loop_start()
    info = client.publish(TELEMETRY_TOPIC, telemetry)
    info.wait_for_publish()
    client.disconnect()
    return info.rc

`