thingsboard / thingsboard-gateway

Open-source IoT Gateway - integrates devices connected to legacy and third-party systems with ThingsBoard IoT Platform using Modbus, CAN bus, BACnet, BLE, OPC-UA, MQTT, ODBC and REST protocols
https://thingsboard.io/docs/iot-gateway/what-is-iot-gateway/
Apache License 2.0
1.73k stars 841 forks source link

org.eclipse.paho.client.mqttv3.MqttException: Too many publishes in progress #188

Closed samf48 closed 4 years ago

samf48 commented 5 years ago

I have an issue that pertains to publishing many items to a topic. I receive org.eclipse.paho.client.mqttv3.MqttException: Too many publishes in progress in the tb-gateway log.

I have around 173 publishes occurring at once, eventually more. I researched this a bit and found you can adjust the 'maxInFlight' parameter but I've done that and still I have the same error. I'm using release 2.3.1 for Thingsboard and 2.2.1 for the gateway.

#
# Copyright © 2017 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

server:
  # Server bind address
  address: "0.0.0.0"
  # Server bind port
  port: "3030"

# Check new version updates parameters
updates:
  # Enable/disable updates checking.
  enabled: true

gateways:
  tenants:
    -
      label: "Tenant"
      reporting:
        interval: 6000
      persistence:
        type: file
        path: storage
        bufferSize: 10000
      connection:
        host: "127.0.0.1"
        port: 1883
        retryInterval: 3000
        maxInFlight: 65000
        security:
          accessToken: "SR9yiWO4Z7I0xsTRHpD7"
      remoteConfiguration: true
      extensions:
        -
          id: "mqtt"
          type: "MQTT"
          extensionConfiguration: mqtt-config.json          

Part of the logfile for tb-gateway:

2019-05-22 19:17:43,218 [pool-3-thread-1] INFO  o.t.g.s.gateway.MqttGatewayService - Gateway statistics {"ts":1558552662308,"values":{"devicesOnline":70,"attributesUploaded":0,"telemetryUploaded":0}} reported!
2019-05-22 19:17:49,224 [pool-3-thread-1] INFO  o.t.g.s.gateway.MqttGatewayService - Gateway statistics {"ts":1558552668310,"values":{"devicesOnline":70,"attributesUploaded":0,"telemetryUploaded":0}} reported!
2019-05-22 19:17:53,355 [pool-1-thread-1219] WARN  o.t.g.e.m.client.MqttBrokerMonitor - [unassigned089] Failed to publish to topic [sensor/unassigned089/model] 
org.eclipse.paho.client.mqttv3.MqttException: Too many publishes in progress
    at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:513)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:158)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:187)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1355)
    at org.thingsboard.gateway.extensions.mqtt.client.MqttBrokerMonitor.publish(MqttBrokerMonitor.java:317)
    at org.thingsboard.gateway.extensions.mqtt.client.MqttBrokerMonitor.onAttributesUpdated(MqttBrokerMonitor.java:263)
    at org.thingsboard.gateway.service.gateway.MqttGatewayService.lambda$null$20(MqttGatewayService.java:458)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
2019-05-22 19:17:53,355 [pool-1-thread-1230] WARN  o.t.g.e.m.client.MqttBrokerMonitor - [unassigned086] Failed to publish to topic [sensor/unassigned086/model] 
org.eclipse.paho.client.mqttv3.MqttException: Too many publishes in progress
    at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:513)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:158)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:187)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1355)
    at org.thingsboard.gateway.extensions.mqtt.client.MqttBrokerMonitor.publish(MqttBrokerMonitor.java:317)
    at org.thingsboard.gateway.extensions.mqtt.client.MqttBrokerMonitor.onAttributesUpdated(MqttBrokerMonitor.java:263)
    at org.thingsboard.gateway.service.gateway.MqttGatewayService.lambda$null$20(MqttGatewayService.java:458)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
2019-05-22 19:17:53,355 [pool-1-thread-1233] WARN  o.t.g.e.m.client.MqttBrokerMonitor - [unassigned062] Failed to publish to topic [sensor/unassigned062/model] 
org.eclipse.paho.client.mqttv3.MqttException: Too many publishes in progress
    at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:513)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:158)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:187)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1355)
    at org.thingsboard.gateway.extensions.mqtt.client.MqttBrokerMonitor.publish(MqttBrokerMonitor.java:317)
    at org.thingsboard.gateway.extensions.mqtt.client.MqttBrokerMonitor.onAttributesUpdated(MqttBrokerMonitor.java:263)
    at org.thingsboard.gateway.service.gateway.MqttGatewayService.lambda$null$20(MqttGatewayService.java:458)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
2019-05-22 19:17:53,356 [pool-1-thread-1230] WARN  o.t.g.e.m.client.MqttBrokerMonitor - [unassigned072] Failed to publish to topic [sensor/unassigned072/model] 
org.eclipse.paho.client.mqttv3.MqttException: Too many publishes in progress
    at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:513)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:158)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:187)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1355)
    at org.thingsboard.gateway.extensions.mqtt.client.MqttBrokerMonitor.publish(MqttBrokerMonitor.java:317)
    at org.thingsboard.gateway.extensions.mqtt.client.MqttBrokerMonitor.onAttributesUpdated(MqttBrokerMonitor.java:263)
    at org.thingsboard.gateway.service.gateway.MqttGatewayService.lambda$null$20(MqttGatewayService.java:458)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
samf48 commented 5 years ago

I notice it only does this when I post to the attributes. My mqtt-config.json file:

[{
        "id": "mqtt",
        "type": "MQTT",
        "configuration": {
            "brokers": [{
                    "host": "localhost",
                    "port": 1885,
                    "ssl": false,
                    "retryInterval": 3000,
                    "credentials": {
                        "type": "anonymous"
                    },
                    "mapping": [{
                            "topicFilter": "sensors",
                            "converter": {
                                "type": "json",
                                "filterExpression": "$",
                                "deviceNameJsonExpression": "${$.SensorName}",
                                "deviceTypeJsonExpression": "${$.model}",
                                "attributes": [{
                                        "type": "string",
                                        "key": "model",
                                        "value": "${$.model}"
                                    }, {
                                        "type": "string",
                                        "key": "ffT",
                                        "value": "${$.ffT}"
                                    }
                                ],
                                "timeseries": [{
                                        "type": "string",
                                        "key": "IMEI",
                                        "value": "${$.IMEI}",
                                        "ts": "${$.time}"
                                    }, {
                                        "type": "string",
                                        "key": "NI",
                                        "value": "${$.NI}",
                                        "ts": "${$.time}"
                                    }, {
                                        "type": "string",
                                        "key": "protocol",
                                        "value": "${$.protocolP}",
                                        "ts": "${$.time}"
                                    }, {
                                        "type": "string",
                                        "key": "timestamp",
                                        "value": "${$.timestamp}",
                                        "ts": "${$.time}"
                                    }, {
                                        "type": "string",
                                        "key": "hex",
                                        "value": "${$.hex}",
                                        "ts": "${$.time}"
                                    }, {
                                        "type": "string",
                                        "key": "lat",
                                        "value": "${$.lat}",
                                        "ts": "${$.time}"
                                    }, {
                                        "type": "string",
                                        "key": "lon",
                                        "value": "${$.lon}",
                                        "ts": "${$.time}"
                                    }, {
                                        "type": "string",
                                        "key": "elev",
                                        "value": "${$.elev}",
                                        "ts": "${$.time}"
                                    }, {
                                        "type": "string",
                                        "key": "mode",
                                        "value": "${$.mode}",
                                        "ts": "${$.time}"
                                    }, {
                                        "type": "string",
                                        "key": "baro",
                                        "value": "${$.baro}",
                                        "ts": "${$.time}"
                                    }, {
                                        "type": "string",
                                        "key": "airTemp",
                                        "value": "${$.airTemp}",
                                        "ts": "${$.time}"
                                    }, {
                                        "type": "string",
                                        "key": "battery",
                                        "value": "${$.battery}",
                                        "ts": "${$.time}"
                                    }, {
                                        "type": "string",
                                        "key": "depth1",
                                        "value": "${$.depth1}",
                                        "ts": "${$.time}"
                                    }, {
                                        "type": "string",
                                        "key": "ffi1",
                                        "value": "${$.ffi1}",
                                        "ts": "${$.time}"
                                    }
                                ]
                            }
                        }, {
                            "topicFilter": "gps",
                            "converter": {
                                "attributes": [],
                                "timeseries": [{
                                        "type": "string",
                                        "key": "timestamp",
                                        "value": "${$.timestamp}"
                                    }, {
                                        "type": "string",
                                        "key": "lat",
                                        "value": "${$.lat}"
                                    }, {
                                        "type": "string",
                                        "key": "lon",
                                        "value": "${$.lon}"
                                    }, {
                                        "type": "string",
                                        "key": "elev",
                                        "value": "${$.elev}"
                                    }, {
                                        "type": "string",
                                        "key": "IMEI",
                                        "value": "${$.IMEI}"
                                    }, {
                                        "type": "string",
                                        "key": "NI",
                                        "value": "${$.NI}"
                                    }
                                ],
                                "type": "json",
                                "deviceNameJsonExpression": "${$.SensorName}",
                                "deviceTypeJsonExpression": "${$.model}",
                                "filterExpression": "$"
                            }
                        }
                    ],
                    "connectRequests": [{
                            "topicFilter": "sensors/connect",
                            "deviceNameJsonExpression": "${$.SensorName}"
                        }
                    ],
                    "disconnectRequests": [{
                            "topicFilter": "sensors/disconnect",
                            "deviceNameJsonExpression": "${$.SensorName}"
                        }
                    ],
                    "attributeRequests": [{
                            "topicFilter": "sensors/attributes",
                            "deviceNameJsonExpression": "${$.SensorName}",
                            "attributeKeyJsonExpression": "${$.key}",
                            "requestIdJsonExpression": "${$.requestId}",
                            "clientScope": false,
                            "responseTopicExpression": "sensors/${deviceName}/attributes/${responseId}",
                            "valueExpression": "{\"${attributeKey}\":\"${attributeValue}\"}"
                        }
                    ],
                    "attributeUpdates": [{
                            "deviceNameFilter": ".*",
                            "attributeFilter": ".*",
                            "topicExpression": "sensor/${deviceName}/${attributeKey}",
                            "valueExpression": "{\"${attributeKey}\":\"${attributeValue}\"}"
                        }
                    ],
                    "serverSideRpc": [{
                            "deviceNameFilter": ".*",
                            "methodFilter": "echo",
                            "requestTopicExpression": "sensor/${deviceName}/request/${methodName}/${requestId}",
                            "responseTopicExpression": "sensor/${deviceName}/response/${methodName}/${requestId}",
                            "responseTimeout": 10000,
                            "valueExpression": "${params}"
                        }, {
                            "deviceNameFilter": ".*",
                            "methodFilter": "no-reply",
                            "requestTopicExpression": "sensor/${deviceName}/request/${methodName}/${requestId}",
                            "valueExpression": "${params}"
                        }
                    ]
                }
            ]
        }
    }
]
imbeacon commented 4 years ago

Hi, @samf48

I am glad to inform you that we have released new version of the Gateway based on Python. This is a major rewrite and improvement from Java version. Main benefits:

You can use it for your purposes. We look forward to your feedback.