thingsboard / thingsboard-gateway

Open-source IoT Gateway - integrates devices connected to legacy and third-party systems with ThingsBoard IoT Platform using Modbus, CAN bus, BACnet, BLE, OPC-UA, MQTT, ODBC and REST protocols
https://thingsboard.io/docs/iot-gateway/what-is-iot-gateway/
Apache License 2.0
1.75k stars 845 forks source link

[BUG] Not all telemetry is sent to Thingsboard #826

Closed ilseva closed 2 years ago

ilseva commented 2 years ago

Describe the bug We configured a tb-gateway on a raspberry to convert telemetries registered by shelly-em and send them to thingsboard (comunity edition, on premise).

Shelly EM reports data on various MQTT topics and we want to register a devices on TB for each measured dimension (eg: energy, power, voltage, ...).

We created a custom converter that transform data based on topic

def convert(self, topic, body):
        try:
            matched = re.search(
                r'shellies/(shellyem-\w*?)/(relay|emeter)/(\d)/?(\w*)?', topic)
            if matched:
                name_with_channel = matched.group(1) + '-ch' + matched.group(3)
                self.dict_result["deviceType"] = "Shelly EM"
                self.dict_result["telemetry"] = []
                if matched.group(2) == 'emeter':
                    key = matched.group(4)
                else:
                    key = 'state'
                self.dict_result["deviceName"] = name_with_channel + "_" + key
                em_status = {key: body}
                self.dict_result["telemetry"].append(em_status)

            log.info('send %s [%s] telemetry for %s value %s',
                     self.dict_result["deviceName"], self.dict_result["deviceType"], key, body)
            return self.dict_result
        except Exception as e:
            log.exception('Error in converter message: \n%s\n', body)
            log.exception(e)

Everythings seems work fine because in the logs we see messages for each data received on various topics.

The devices have been registered on thingsboard but since shelly em send a message every 60 seconds on each topic I expect to see all the recorded telemetry but there is no device that has all the data saved. There are many jumps between one telemetry and another and some devices don't even have one.

I think that the problem is in the configuration of the local storage (we use an in memory), but I can't understand which could be the correct configuration. This is the current configuration:

storage:
  type: memory
  read_records_count: 100
  max_records_count: 100000

The only errors I saw in log are these

|ERROR| - [tb_gateway_service.py] - tb_gateway_service - __send_to_storage - 574 - 'str' object has no attribute 'get'"
|ERROR| - [tb_utility.py] - tb_utility - validate_converted_data - 67 - deviceName is empty in data: {"telemetry": [], "attributes": []}"
|ERROR| - [tb_gateway_service.py] - tb_gateway_service - __send_to_storage - 518 - Data from MQTT Broker Connector connector is invalid."

But I think they are irrelevant because appeare for each message that the converter translate (the first one disappear if I set the timestamp inside the converter, but telemetries are still loss).

I set the log level on DEBUG for each component, but no message clarify me where is the problem.

Thanks for support.

Connector name (If bug in the some connector): MQTT Connector

Versions (please complete the following information):

islomkhon commented 2 years ago

me face the same issue, and gateway losses connection and tries to setup connection but unsuccessful.

ilseva commented 2 years ago

me face the same issue, and gateway losses connection and tries to setup connection but unsuccessful.

hi @islomkhon! how did you notice that gateway loss connection?

imbeacon commented 2 years ago

Hi @ilseva,

According to the error from your message - something wrong with converted data, please add a log before returning and send the logs, with converted data and error.

ilseva commented 2 years ago

Hi @imbeacon, thanks for you reply.

These are some log messages from the converter generated by this line of code:

log.info('send %s [%s] telemetry for %s value %s', self.dict_result["deviceName"], self.dict_result["deviceType"], key, body)
""2022-05-13 09:00:34" - |INFO| - [shelly_mqtt_em_converter.py] - shelly_mqtt_em_converter - convert - 48 - send shellyem-E44DDD-ch0_power [Shelly_EM] telemetry for power value 144.43"
""2022-05-13 09:00:34" - |INFO| - [shelly_mqtt_em_converter.py] - shelly_mqtt_em_converter - convert - 48 - send shellyem-E44DDD-ch0_reactive_power [Shelly_EM] telemetry for reactive_power value 107.68"
""2022-05-13 09:00:34" - |INFO| - [shelly_mqtt_em_converter.py] - shelly_mqtt_em_converter - convert - 48 - send shellyem-E44DDD-ch0_voltage [Shelly_EM] telemetry for voltage value 231.68"
""2022-05-13 09:00:34" - |INFO| - [shelly_mqtt_em_converter.py] - shelly_mqtt_em_converter - convert - 48 - send shellyem-E44DDD-ch0_total [Shelly_EM] telemetry for total value 869422.2"
""2022-05-13 09:00:34" - |INFO| - [shelly_mqtt_em_converter.py] - shelly_mqtt_em_converter - convert - 48 - send shellyem-E44DDD-ch0_total_returned [Shelly_EM] telemetry for total_returned value 0.0"
""2022-05-13 09:00:34" - |INFO| - [shelly_mqtt_em_converter.py] - shelly_mqtt_em_converter - convert - 48 - send shellyem-E44DDD-ch1_power [Shelly_EM] telemetry for power value 2.69"
""2022-05-13 09:00:34" - |INFO| - [shelly_mqtt_em_converter.py] - shelly_mqtt_em_converter - convert - 48 - send shellyem-E44DDD-ch1_reactive_power [Shelly_EM] telemetry for reactive_power value 0.0"
""2022-05-13 09:00:34" - |INFO| - [shelly_mqtt_em_converter.py] - shelly_mqtt_em_converter - convert - 48 - send shellyem-E44DDD-ch1_voltage [Shelly_EM] telemetry for voltage value 231.68"
""2022-05-13 09:00:34" - |INFO| - [shelly_mqtt_em_converter.py] - shelly_mqtt_em_converter - convert - 48 - send shellyem-E44DDD-ch1_total [Shelly_EM] telemetry for total value 156294.6"
""2022-05-13 09:00:34" - |INFO| - [shelly_mqtt_em_converter.py] - shelly_mqtt_em_converter - convert - 48 - send shellyem-E44DDD-ch1_total_returned [Shelly_EM] telemetry for total_returned value 0.3"

As you can see the device name is never empty. The error regarding the

""2022-05-13 09:04:00" - |ERROR| - [tb_gateway_service.py] - tb_gateway_service - __send_to_storage - 574 - 'str' object has no attribute 'get'"
""2022-05-13 09:04:00" - |ERROR| - [tb_gateway_service.py] - tb_gateway_service - __send_to_storage - 574 - 'str' object has no attribute 'get'"
""2022-05-13 09:04:00" - |ERROR| - [tb_gateway_service.py] - tb_gateway_service - __send_to_storage - 574 - 'str' object has no attribute 'get'"
""2022-05-13 09:04:00" - |ERROR| - [tb_gateway_service.py] - tb_gateway_service - __send_to_storage - 574 - 'str' object has no attribute 'get'"
""2022-05-13 09:04:00" - |ERROR| - [tb_gateway_service.py] - tb_gateway_service - __send_to_storage - 574 - 'str' object has no attribute 'get'"
""2022-05-13 09:04:00" - |ERROR| - [tb_gateway_service.py] - tb_gateway_service - __send_to_storage - 574 - 'str' object has no attribute 'get'"
""2022-05-13 09:04:00" - |ERROR| - [tb_gateway_service.py] - tb_gateway_service - __send_to_storage - 574 - 'str' object has no attribute 'get'"
""2022-05-13 09:04:00" - |ERROR| - [tb_gateway_service.py] - tb_gateway_service - __send_to_storage - 574 - 'str' object has no attribute 'get'"
""2022-05-13 09:04:00" - |ERROR| - [tb_gateway_service.py] - tb_gateway_service - __send_to_storage - 574 - 'str' object has no attribute 'get'"

are related to the timestamp property that I don't set in the converter and that is added by the service (tb_gateway_service.__convert_telemetry_to_ts) as I said in the #818. If I set the timestamp in the coverter, these kind of errors disappear but messages are still not delivered to Thingsboard. The error related to

""2022-05-13 09:04:09" - |ERROR| - [tb_utility.py] - tb_utility - validate_converted_data - 67 - deviceName is empty in data: {"telemetry": [], "attributes": []}"
""2022-05-13 09:04:09" - |ERROR| - [tb_gateway_service.py] - tb_gateway_service - __send_to_storage - 518 - Data from MQTT Broker Connector connector is invalid."

Still persists, but in the main loop of the tb_gateway_service there is a continue that allows to skip messages that are not valid

 if not TBUtility.validate_converted_data(data):
                                log.error("Data from %s connector is invalid.", connector_name)
                                continue

Thanks for support.

imbeacon commented 2 years ago

Could you try to use the version from the master branch for the testing purposes?

ilseva commented 2 years ago

Yes, I can. Is there a way to create package for raspberry? Thanks.

imbeacon commented 2 years ago

Yes, you can use generate_deb_package script in the repository root folder.

ilseva commented 2 years ago

Hi @imbeacon, this is the step I follow:

The gateway does not start :-(

May 13 09:50:47 raspberrypi systemd[1]: Started ThingsBoard Gateway.
May 13 09:50:47 raspberrypi python3[14915]: Traceback (most recent call last):
May 13 09:50:47 raspberrypi python3[14915]:   File "<string>", line 1, in <module>
May 13 09:50:47 raspberrypi python3[14915]:   File "/usr/lib/python3/dist-packages/thingsboard_gateway/__init__.py", line 15, in <module>
May 13 09:50:47 raspberrypi python3[14915]:     from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService
May 13 09:50:47 raspberrypi python3[14915]:   File "/usr/lib/python3/dist-packages/thingsboard_gateway/gateway/tb_gateway_service.py", line 16, in <module>
May 13 09:50:47 raspberrypi python3[14915]:     import logging.config
May 13 09:50:47 raspberrypi python3[14915]:   File "/usr/lib/python3.9/logging/config.py", line 37, in <module>
May 13 09:50:47 raspberrypi python3[14915]:     from socketserver import ThreadingTCPServer, StreamRequestHandler
May 13 09:50:47 raspberrypi python3[14915]:   File "/usr/lib/python3.9/socketserver.py", line 390, in <module>
May 13 09:50:47 raspberrypi python3[14915]:     class TCPServer(BaseServer):
May 13 09:50:47 raspberrypi python3[14915]:   File "/usr/lib/python3.9/socketserver.py", line 437, in TCPServer
May 13 09:50:47 raspberrypi python3[14915]:     address_family = socket.AF_INET
May 13 09:50:47 raspberrypi python3[14915]: AttributeError: module 'socket' has no attribute 'AF_INET'
May 13 09:50:47 raspberrypi systemd[1]: thingsboard-gateway.service: Main process exited, code=exited, status=1/FAILURE
May 13 09:50:47 raspberrypi systemd[1]: thingsboard-gateway.service: Failed with result 'exit-code'.
May 13 09:50:47 raspberrypi systemd[1]: thingsboard-gateway.service: Scheduled restart job, restart counter is at 5.
May 13 09:50:47 raspberrypi systemd[1]: Stopped ThingsBoard Gateway.
May 13 09:50:47 raspberrypi systemd[1]: thingsboard-gateway.service: Start request repeated too quickly.
May 13 09:50:47 raspberrypi systemd[1]: thingsboard-gateway.service: Failed with result 'exit-code'.
May 13 09:50:47 raspberrypi systemd[1]: Failed to start ThingsBoard Gateway.
islomkhon commented 2 years ago

cuss if I change the shared attrebute is not updating in the connector. and the way that I am sending the telemetry is using data_to_sent = {"ts":ts, "values": data}
self.__gateway.send_to_storage(self.get_name(), {"deviceName": self.devices[device]["device_config"]["name"], "deviceType":"default", "telemetry":[data_to_sent]})

imbeacon commented 2 years ago

@ilseva

Please add a log like the following one before return in convert function:

log.info(self.dict_result)

ilseva commented 2 years ago

here they are

""2022-05-13 11:06:43" - |INFO| - [shelly_mqtt_em_converter.py] - shelly_mqtt_em_converter - convert - 49 - {'deviceType': 'Shelly_EM', 'telemetry': [{'total': 10524.2}], 'deviceName': 'shellyem-E45278-ch1_total', 'attributes': [{'measurementProperty': 'total'}, {'unitOfMeasure': 'Wh'}]}"
""2022-05-13 11:06:43" - |INFO| - [shelly_mqtt_em_converter.py] - shelly_mqtt_em_converter - convert - 49 - {'deviceType': 'Shelly_EM', 'telemetry': [{'total_returned': 0.0}], 'deviceName': 'shellyem-E45278-ch1_total_returned', 'attributes': [{'measurementProperty': 'total_returned'}, {'unitOfMeasure': 'Wh'}]}"
islomkhon commented 2 years ago

cuss if I change the shared attrebute is not updating in the connector. and the way that I am sending the telemetry is using data_to_sent = {"ts":ts, "values": data} self.__gateway.send_to_storage(self.get_name(), {"deviceName": self.devices[device]["device_config"]["name"], "deviceType":"default", "telemetry":[data_to_sent]})

it sends well what is in data_to_sent but if I am increasing the data amount in the data_to_sent it is starting to drop connection and link again and shows log below

""2022-05-13 16:53:04" - |INFO| - [tb_gateway_mqtt.py] - tb_gateway_mqtt - gw_subscribe_to_attribute - 197 - Subscribed to | with id 1 for device *" ""2022-05-13 16:53:04" - |INFO| - [tb_device_mqtt.py] - tb_device_mqtt - _on_connect - 139 - connection SUCCESS"

each time this logs appear Subscribed to | with id increments

ilseva commented 2 years ago

sorry... is the bug confirmed or can I make some workaround?

WingedKitten commented 2 years ago

I met the same problem,I add log the following one before return in tb_gateway_mqtt.py gw_send_telemetry log.info('to tb: %s', telemetry) and add log the following one before return in extensions log.info('return data: %s', self.dict_result) Their number is not the same Data upload a 30 seconds,But they are not continuous

WingedKitten commented 2 years ago

@ilseva Hi, may I found the problem In a custom parser using dict_result rather than self. dict_result I asked a friend He said the self dict_result will refer to the original data,They point to the same address Hope to be able to help you

ilseva commented 2 years ago

Hi @WingedKitten, do you mean that I have to define a dict_result variable in convert method and return it? like that?

def convert(self, topic, body):
        try:
            matched = re.search(
                r'shellies/(shellyem-\w*?)/(relay|emeter)/(\d)/?(\w*)?', topic)
            if matched:
                name_with_channel = matched.group(1) + '-ch' + matched.group(3)
                dict_result["deviceType"] = "Shelly EM"
                dict_result["telemetry"] = []
                if matched.group(2) == 'emeter':
                    key = matched.group(4)
                else:
                    key = 'state'
                dict_result["deviceName"] = name_with_channel + "_" + key
                em_status = {key: body}
                dict_result["telemetry"].append(em_status)

            log.info('send %s [%s] telemetry for %s value %s',
                     dict_result["deviceName"], dict_result["deviceType"], key, body)
            return dict_result
        except Exception as e:
            log.exception('Error in converter message: \n%s\n', body)
            log.exception(e)

I tried, but not all telemetries are sent to thingsboard. Thanks for your help!

samson0v commented 2 years ago

Hi @ilseva, the problem with your converter is that you are using self.dict_result which is the attribute of the python class (converter object doesn't recreating after convert method calling) this means that your dict_result has to be clear manually or your dict_result will be bigger and bigger with every method convert is calling, from this continuing - Gateway have more work and less performance.

The simplest solution is to make dict_result a local variable in the convert method.

ilseva commented 2 years ago

Thanks @samson0v for you reply. If you look at my previous comment with the convert method code, I already set dict_result as local variable but the problem isn't resolved.

samson0v commented 2 years ago

@ilseva, please, send the logs (but firstly set the log level to DEBUG instead of INFO in all places except the ERROR in the logs.conf file)

ilseva commented 2 years ago

@samson0v these are the logs tb_gateway_log.zip

and these are latest telemetries received by thingsboard image

samson0v commented 2 years ago

@ilseva, please, send your converter and config file, I will try to run it locally.

ilseva commented 2 years ago

In attach you can find a zip containing converter and config files. tb_config.zip

This morning I check the logs and I see that, for example, power is "converted" by the tb_gateway each 5 seconds but telemetries are registered on thingsboard every 1 minutes

samson0v commented 2 years ago

@ilseva, I made some little changes to your converter. So, for me, it is working perfectly. Also, update your Gateway to the newest version.

from time import time
import re

from thingsboard_gateway.connectors.mqtt.mqtt_uplink_converter import MqttUplinkConverter, log

def set_attributes(key):
    attributes = [{"measurementProperty": key}]
    if key in ["energy", "returned_energy"]:
        attributes.append({"unitOfMeasure": "Wm"})
    if key in ["total", "total_returned"]:
        attributes.append({"unitOfMeasure": "Wh"})
    if key in ["power", "reactive_power"]:
        attributes.append({"unitOfMeasure": "W"})
    if key == "voltage":
        attributes.append({"unitOfMeasure": "V"})
    return attributes

class JsonMqttUplinkConverter(MqttUplinkConverter):
    def __init__(self, config):
        self.__config = config.get('converter')

    def convert(self, topic, body):
        try:
            matched = re.search(
                r'shellies/(shellyem-\w*?)/(relay|emeter)/(\d)/?(\w*)?', topic)
            if matched:
                dict_result = {}
                name_with_channel = matched.group(1) + '-ch' + matched.group(3)
                dict_result["deviceType"] = "Shelly_EM"
                dict_result["telemetry"] = []
                if matched.group(2) == 'emeter':
                    key = matched.group(4)
                else:
                    key = 'state'
                dict_result["deviceName"] = name_with_channel + "_" + key
                em_status = {"ts": int(time() * 1000), "values": {key: body}}
                dict_result["telemetry"].append(em_status)
                dict_result["attributes"] = set_attributes(key)

                log.info('send %s [%s] telemetry for %s value %s', dict_result["deviceName"], dict_result["deviceType"],
                         key, body)
                log.info(dict_result)
                return dict_result
            else:
                log.info('no matches on topic %s', topic)
                return []
        except Exception as e:
            log.exception('Error in converter message: \n%s\n', body)
            log.exception(e)
samson0v commented 2 years ago

So I think that there is no bug in Gateway.

WingedKitten commented 2 years ago

I found a problem,When I use sqlite The log shows ""2022-06-19 22:11:17" - |INFO| - [sqlite_event_storage.py] - sqlite_event_storage - put - 76 - Sending data to storage" But he did not send to Thingsboard ,no 'to tb' log The normal send log is like this ""2022-06-19 22:11:16" - |INFO| - [rgwi5110_save_ts.py] - rgwi5110_save_ts - convert - 30 - accept data: {'dId': 'FlatKnit-HengQiang', 'dName': '1109', 'iData': [{'iId': 'errBeginTime', 'iName': 'Abnormal start time', 'iTs': 1655691067548, 'iType': 'h_2_1', 'iValue': '2022-06-20 10:10:38'}, {'iId': 'errReport', 'iName': 'Abnormal content', 'iTs': 1655691067548, 'iType': 'h_2_1', 'iValue': 'The line in the left'}, {'iId': 'errEndTime', 'iName': 'Abnormal end time', 'iTs': 1655691067548, 'iType': 'h_2_1', 'iValue': '2022-06-20 10:11:07'}, {'iId': 'errTimes', 'iName': 'processing time(s)', 'iTs': 1655691067548, 'iType': 'h_2_1', 'iValue': 29}]}" ""2022-06-19 22:11:16" - |INFO| - [rgwi5110_save_ts.py] - rgwi5110_save_ts - convert - 60 - return data: {'deviceName': '1109', 'deviceType': 'FlatKnit-HengQiang', 'telemetry': [{'ts': 1655691067548, 'values': {'Abnormal start time': '2022-06-20 10:10:38'}}, {'ts': 1655691067548, 'values': {'Abnormal content': 'The line in the left'}}, {'ts': 1655691067548, 'values': {'Abnormal end time': '2022-06-20 10:11:07'}}, {'ts': 1655691067548, 'values': {'processing time(s)': '29'}}]}" ""2022-06-19 22:11:17" - |INFO| - [sqlite_event_storage.py] - sqlite_event_storage - put - 76 - Sending data to storage" ""2022-06-19 22:11:17" - |INFO| - [tb_gateway_mqtt.py] - tb_gateway_mqtt - gw_send_telemetry - 139 - to tb: [{'ts': 1655691067548, 'values': {'Abnormal start time': '2022-06-20 10:10:38'}}, {'ts': 1655691067548, 'values': {'Abnormal content': 'The line in the left'}}, {'ts': 1655691067548, 'values': {'Abnormal end time': '2022-06-20 10:11:07'}}, {'ts': 1655691067548, 'values': {'processing time(s)': '29'}}]" But some data are not ' to tb:' logs the 'accept data' and 'return data' is a custom converter log

def gw_send_telemetry(self, device, telemetry, quality_of_service=1):
    if not isinstance(telemetry, list) and not (isinstance(telemetry, dict) and telemetry.get("ts") is not None):
        telemetry = [telemetry]
    # Record data is sent to the thingsboard
    log.info('to tb: %s', telemetry)
    return self.publish_data({device: telemetry}, GATEWAY_MAIN_TOPIC + "telemetry", quality_of_service, )

type: sqlite data_file_path: ./data/data.db messages_ttl_check_in_hours: 1 messages_ttl_in_days: 7

thingsboard_getaway 3.0.1 @samson0v @imbeacon

ilseva commented 2 years ago

Finally we solved the problem, the issue is related to our configuration.

We have many raspberrypis where we have installed the thingsboard_gateway, but we had a single "gateway" configured as a device on thingsboard.

We have created different "gateway" devices on thingsboard and made a 1-1 association with the thinsboard_gateway that runs on each raspberry.

Thanks for support.