emqx / MQTTX

A Powerful and All-in-One MQTT 5.0 client toolbox for Desktop, CLI and WebSocket.
https://mqttx.app
Apache License 2.0
3.64k stars 428 forks source link

[Bug] CLI Publish retries in loop if disconnected by invalid topic #1667

Closed jason-grimme closed 4 weeks ago

jason-grimme commented 1 month ago

What did I do

Background: My broker disconnects the client if the topic is not authorized or invalid (I think this is normal) When I use CLI to publish with an invalid topic, it results in the CLI retrying indefinitely and never exits

What happened

The CLI indefinitely retries and never exits. Looking at the --debug log I can see that the client is disconnected and it reconnects.

Expected

When I do same in GUI, the client is disconnected and no retry is done. But in the CLI it is stuck in loop reconnecting and trying to publish the same (invalid) thing. If infinite retry is expected, then there should be a flag to opt-out of retry. Or to specify the number of retries. I tried using --message-expiry-interval 1 and --maximum-reconnect-times 1 but these had no impact.

Environment

More detail

mqttx.exe pub -t $topicPublish --message $messageJsonB64 --debug --qos 1 --format "base64" --mqtt-version 3.1.1 --protocol "mqtts" --insecure -h $Hostname -p 8883 --client-id $clientId --key $ClientKey --cert $ClientCert```
 mqttjs connecting to an MQTT broker... +0ms
  mqttjs:client MqttClient :: options.protocol mqtts +0ms
  mqttjs:client MqttClient :: options.protocolVersion 4 +0ms
  mqttjs:client MqttClient :: options.username undefined +1ms
  mqttjs:client MqttClient :: options.keepalive 30 +0ms
  mqttjs:client MqttClient :: options.reconnectPeriod 1000 +1ms
  mqttjs:client MqttClient :: options.rejectUnauthorized false +0ms
  mqttjs:client MqttClient :: options.topicAliasMaximum undefined +0ms
  mqttjs:client MqttClient :: clientId W-Auto-vFOIZbG-identifier +0ms
  mqttjs:client MqttClient :: setting up stream +1ms
  mqttjs:client _setupStream :: calling method to clear reconnect +0ms
  mqttjs:client _clearReconnect : clearing reconnect timer +0ms
  mqttjs:client _setupStream :: using streamBuilder provided to client to create stream +1ms
  mqttjs calling streambuilder for mqtts +5ms
  mqttjs:tls port 8883 host mqtt.myserver.com rejectUnauthorized %b false +0ms
  mqttjs:client _setupStream :: pipe stream to writable stream +11ms
  mqttjs:client _setupStream: sending packet `connect` +0ms
  mqttjs:client sendPacket :: packet: { cmd: 'connect' } +1ms
  mqttjs:client sendPacket :: emitting `packetsend` +1ms
  mqttjs:client sendPacket :: writing to stream +0ms
  mqttjs:client sendPacket :: writeToStream result true +10ms
[5/23/2024] [9:33:37 AM] » ...  Connecting...
  mqttjs:client writable stream :: parsing buffer +72ms
  mqttjs:client parser :: on packet push to packets array. +1ms
  mqttjs:client work :: getting next packet in queue +0ms
  mqttjs:client work :: packet pulled from queue +0ms
  mqttjs:client _handlePacket :: emitting packetreceive +0ms
  mqttjs:client _handleConnack +0ms
  mqttjs:client _setupPingTimer :: keepalive 30 (seconds) +0ms
  mqttjs:client connect :: sending queued packets +1ms
  mqttjs:client deliver :: entry undefined +0ms
  mqttjs:client _resubscribe +0ms
[5/23/2024] [9:33:37 AM] » √  Connected
[5/23/2024] [9:33:37 AM] » ...  Message publishing...
  mqttjs:client publish :: message `{
    "foo-message":  {
                        "alert":  {
                                      "notify":  "FromTestScript",
                                      "param":  {
                                                    "action_cmd":  "2024-05-23T09:33:37.0855499-04:00"
                                                }
                                  }
                    }
}` to topic `xx_BAD_TOPIC` +1ms
  mqttjs:client publish :: qos 1 +1ms
  mqttjs:client MqttClient:publish: packet cmd: publish +0ms
  mqttjs:client _sendPacket :: (W-Auto-vFOIZbG-identifier) ::  start +0ms
  mqttjs:client storeAndSend :: store packet with cmd publish to outgoingStore +0ms
  mqttjs:client nop :: undefined +0ms
  mqttjs:client sendPacket :: packet: {
  mqttjs:client   cmd: 'publish',
  mqttjs:client   topic: 'xx_BAD_TOPIC',
  mqttjs:client   payload: <Buffer 7b 0d 0a 20 20 20 20 22 66 6f 6f 2d 6d 65 73 73 61 67 65 22 3a 20 20 7b 0d 0a 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 ... 348 more bytes>,
  mqttjs:client   qos: 1,
  mqttjs:client   retain: undefined,
  mqttjs:client   messageId: 8198,
  mqttjs:client   dup: undefined
  mqttjs:client } +0ms
  mqttjs:client sendPacket :: emitting `packetsend` +1ms
  mqttjs:client sendPacket :: writing to stream +1ms
  mqttjs:client sendPacket :: writeToStream result true +0ms
  mqttjs:client sendPacket :: invoking cb +0ms
  mqttjs:client nop :: undefined +0ms
  mqttjs:client _sendPacket :: (W-Auto-vFOIZbG-identifier) ::  end +1ms
  mqttjs:client (W-Auto-vFOIZbG-identifier)stream :: on close +15ms
  mqttjs:client flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function +1ms
  mqttjs:client stream: emit close to MqttClient +3ms
  mqttjs:client close :: connected set to `false` +1ms
  mqttjs:client close :: clearing connackTimer +0ms
  mqttjs:client close :: clearing ping timer +1ms
  mqttjs:client close :: calling _setupReconnect +0ms
  mqttjs:client _setupReconnect :: emit `offline` state +0ms
  mqttjs:client _setupReconnect :: set `reconnecting` to `true` +1ms
  mqttjs:client _setupReconnect :: setting reconnectTimer for 1000 ms +0ms
  mqttjs:client reconnectTimer :: reconnect triggered! +1s
  mqttjs:client _reconnect: emitting reconnect to client +1ms
  mqttjs:client _reconnect: calling _setupStream +4ms
  mqttjs:client _setupStream :: calling method to clear reconnect +1ms
  mqttjs:client _clearReconnect : clearing reconnect timer +0ms
  mqttjs:client _setupStream :: using streamBuilder provided to client to create stream +1ms
  mqttjs calling streambuilder for mqtts +1s
  mqttjs:tls port 8883 host mqtt.myserver.com rejectUnauthorized %b false +1s
  mqttjs:client _setupStream :: pipe stream to writable stream +2ms
  mqttjs:client _setupStream: sending packet `connect` +0ms
  mqttjs:client sendPacket :: packet: { cmd: 'connect' } +1ms
  mqttjs:client sendPacket :: emitting `packetsend` +0ms
  mqttjs:client sendPacket :: writing to stream +1ms
  mqttjs:client sendPacket :: writeToStream result true +0ms
  mqttjs:client writable stream :: parsing buffer +76ms
  mqttjs:client parser :: on packet push to packets array. +1ms
  mqttjs:client work :: getting next packet in queue +2ms
  mqttjs:client work :: packet pulled from queue +0ms
  mqttjs:client _handlePacket :: emitting packetreceive +0ms
  mqttjs:client _handleConnack +1ms
  mqttjs:client _setupPingTimer :: keepalive 30 (seconds) +0ms
  mqttjs:client _sendPacket :: (W-Auto-vFOIZbG-identifier) ::  start +1ms
  mqttjs:client storeAndSend :: store packet with cmd publish to outgoingStore +0ms
  mqttjs:client nop :: undefined +0ms
  mqttjs:client sendPacket :: packet: {
  mqttjs:client   cmd: 'publish',
  mqttjs:client   topic: 'xx_BAD_TOPIC',
  mqttjs:client   payload: <Buffer 7b 0d 0a 20 20 20 20 22 66 6f 6f 2d 6d 65 73 73 61 67 65 22 3a 20 20 7b 0d 0a 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 ... 348 more bytes>,
  mqttjs:client   qos: 1,
  mqttjs:client   retain: undefined,
  mqttjs:client   messageId: 8198,
  mqttjs:client   dup: undefined
  mqttjs:client } +1ms
  mqttjs:client sendPacket :: emitting `packetsend` +1ms
  mqttjs:client sendPacket :: writing to stream +0ms
  mqttjs:client sendPacket :: writeToStream result true +0ms
  mqttjs:client sendPacket :: invoking cb +0ms
  mqttjs:client nop :: undefined +1ms
  mqttjs:client _sendPacket :: (W-Auto-vFOIZbG-identifier) ::  end +0ms
  mqttjs:client (W-Auto-vFOIZbG-identifier)stream :: on close +14ms
  mqttjs:client flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function +2ms
  mqttjs:client stream: emit close to MqttClient +0ms
  mqttjs:client close :: connected set to `false` +1ms
  mqttjs:client close :: clearing connackTimer +0ms
  mqttjs:client close :: clearing ping timer +1ms
  mqttjs:client close :: calling _setupReconnect +0ms
  mqttjs:client _setupReconnect :: emit `offline` state +1ms
  mqttjs:client _setupReconnect :: set `reconnecting` to `true` +1ms
  mqttjs:client _setupReconnect :: setting reconnectTimer for 1000 ms +0ms
  mqttjs:client reconnectTimer :: reconnect triggered! +1s
  mqttjs:client _reconnect: emitting reconnect to client +2ms
  mqttjs:client _reconnect: calling _setupStream +4ms
  mqttjs:client _setupStream :: calling method to clear reconnect +2ms
  mqttjs:client _clearReconnect : clearing reconnect timer +0ms
  mqttjs:client _setupStream :: using streamBuilder provided to client to create stream +2ms
  mqttjs calling streambuilder for mqtts +1s
  mqttjs:tls port 8883 host mqtt.myserver.com rejectUnauthorized %b false +1s
  mqttjs:client _setupStream :: pipe stream to writable stream +6ms
  mqttjs:client _setupStream: sending packet `connect` +2ms
  mqttjs:client sendPacket :: packet: { cmd: 'connect' } +1ms
  mqttjs:client sendPacket :: emitting `packetsend` +1ms
  mqttjs:client sendPacket :: writing to stream +1ms
  mqttjs:client sendPacket :: writeToStream result true +1ms
  mqttjs:client writable stream :: parsing buffer +67ms
  mqttjs:client parser :: on packet push to packets array. +0ms
  mqttjs:client work :: getting next packet in queue +2ms
  mqttjs:client work :: packet pulled from queue +0ms
  mqttjs:client _handlePacket :: emitting packetreceive +0ms
  mqttjs:client _handleConnack +0ms
  mqttjs:client _setupPingTimer :: keepalive 30 (seconds) +1ms
  mqttjs:client _sendPacket :: (W-Auto-vFOIZbG-identifier) ::  start +0ms
  mqttjs:client storeAndSend :: store packet with cmd publish to outgoingStore +0ms
  mqttjs:client nop :: undefined +1ms
  mqttjs:client sendPacket :: packet: {
  mqttjs:client   cmd: 'publish',
  mqttjs:client   topic: 'xx_BAD_TOPIC',
  mqttjs:client   payload: <Buffer 7b 0d 0a 20 20 20 20 22 66 6f 6f 2d 6d 65 73 73 61 67 65 22 3a 20 20 7b 0d 0a 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 ... 348 more bytes>,
  mqttjs:client   qos: 1,
  mqttjs:client   retain: undefined,
  mqttjs:client   messageId: 8198,
  mqttjs:client   dup: undefined
  mqttjs:client } +0ms
  mqttjs:client sendPacket :: emitting `packetsend` +1ms
  mqttjs:client sendPacket :: writing to stream +0ms
  mqttjs:client sendPacket :: writeToStream result true +1ms
  mqttjs:client sendPacket :: invoking cb +0ms
  mqttjs:client nop :: undefined +0ms
  mqttjs:client _sendPacket :: (W-Auto-vFOIZbG-identifier) ::  end +1ms
  mqttjs:client (W-Auto-vFOIZbG-identifier)stream :: on close +15ms
  mqttjs:client flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function +0ms
  mqttjs:client stream: emit close to MqttClient +0ms
  mqttjs:client close :: connected set to `false` +1ms
  mqttjs:client close :: clearing connackTimer +0ms
  mqttjs:client close :: clearing ping timer +0ms
  mqttjs:client close :: calling _setupReconnect +0ms
  mqttjs:client _setupReconnect :: emit `offline` state +1ms
  mqttjs:client _setupReconnect :: set `reconnecting` to `true` +0ms
  mqttjs:client _setupReconnect :: setting reconnectTimer for 1000 ms +0ms
  mqttjs:client reconnectTimer :: reconnect triggered! +1s
  mqttjs:client _reconnect: emitting reconnect to client +0ms
  mqttjs:client _reconnect: calling _setupStream +2ms
  mqttjs:client _setupStream :: calling method to clear reconnect +1ms
  mqttjs:client _clearReconnect : clearing reconnect timer +1ms
  mqttjs:client _setupStream :: using streamBuilder provided to client to create stream +0ms
  mqttjs calling streambuilder for mqtts +1s
  mqttjs:tls port 8883 host mqtt.myserver.com rejectUnauthorized %b false +1s
  mqttjs:client _setupStream :: pipe stream to writable stream +2ms
  mqttjs:client _setupStream: sending packet `connect` +1ms
  mqttjs:client sendPacket :: packet: { cmd: 'connect' } +0ms
  mqttjs:client sendPacket :: emitting `packetsend` +1ms
  mqttjs:client sendPacket :: writing to stream +0ms
  mqttjs:client sendPacket :: writeToStream result true +1ms
  mqttjs:client writable stream :: parsing buffer +72ms
  mqttjs:client parser :: on packet push to packets array. +1ms
  mqttjs:client work :: getting next packet in queue +1ms
  mqttjs:client work :: packet pulled from queue +0ms
  mqttjs:client _handlePacket :: emitting packetreceive +1ms
  mqttjs:client _handleConnack +0ms
  mqttjs:client _setupPingTimer :: keepalive 30 (seconds) +1ms
  mqttjs:client _sendPacket :: (W-Auto-vFOIZbG-identifier) ::  start +0ms
  mqttjs:client storeAndSend :: store packet with cmd publish to outgoingStore +0ms
  mqttjs:client nop :: undefined +0ms
  mqttjs:client sendPacket :: packet: {
  mqttjs:client   cmd: 'publish',
  mqttjs:client   topic: 'xx_BAD_TOPIC',
  mqttjs:client   payload: <Buffer 7b 0d 0a 20 20 20 20 22 66 6f 6f 2d 6d 65 73 73 61 67 65 22 3a 20 20 7b 0d 0a 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 ... 348 more bytes>,
  mqttjs:client   qos: 1,
  mqttjs:client   retain: undefined,
  mqttjs:client   messageId: 8198,
  mqttjs:client   dup: undefined
  mqttjs:client } +1ms
  mqttjs:client sendPacket :: emitting `packetsend` +0ms
  mqttjs:client sendPacket :: writing to stream +1ms
  mqttjs:client sendPacket :: writeToStream result true +0ms
  mqttjs:client sendPacket :: invoking cb +0ms
  mqttjs:client nop :: undefined +0ms
  mqttjs:client _sendPacket :: (W-Auto-vFOIZbG-identifier) ::  end +1ms
  mqttjs:client (W-Auto-vFOIZbG-identifier)stream :: on close +13ms
  mqttjs:client flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function +0ms
  mqttjs:client stream: emit close to MqttClient +1ms
  mqttjs:client close :: connected set to `false` +0ms
  mqttjs:client close :: clearing connackTimer +0ms
  mqttjs:client close :: clearing ping timer +0ms
  mqttjs:client close :: calling _setupReconnect +1ms
  mqttjs:client _setupReconnect :: emit `offline` state +0ms
  mqttjs:client _setupReconnect :: set `reconnecting` to `true` +0ms
  mqttjs:client _setupReconnect :: setting reconnectTimer for 1000 ms +1ms
  mqttjs:client reconnectTimer :: reconnect triggered! +1s
  mqttjs:client _reconnect: emitting reconnect to client +2ms
  mqttjs:client _reconnect: calling _setupStream +3ms
  mqttjs:client _setupStream :: calling method to clear reconnect +2ms
  mqttjs:client _clearReconnect : clearing reconnect timer +1ms
  mqttjs:client _setupStream :: using streamBuilder provided to client to create stream +1ms
  mqttjs calling streambuilder for mqtts +1s
  mqttjs:tls port 8883 host mqtt.myserver.com rejectUnauthorized %b false +1s
  mqttjs:client _setupStream :: pipe stream to writable stream +4ms
  mqttjs:client _setupStream: sending packet `connect` +1ms
  mqttjs:client sendPacket :: packet: { cmd: 'connect' } +0ms
  mqttjs:client sendPacket :: emitting `packetsend` +1ms
  mqttjs:client sendPacket :: writing to stream +3ms
  mqttjs:client sendPacket :: writeToStream result true +1ms
  mqttjs:client writable stream :: parsing buffer +110ms
  mqttjs:client parser :: on packet push to packets array. +1ms
  mqttjs:client work :: getting next packet in queue +2ms
  mqttjs:client work :: packet pulled from queue +0ms
  mqttjs:client _handlePacket :: emitting packetreceive +1ms
  mqttjs:client _handleConnack +0ms
  mqttjs:client _setupPingTimer :: keepalive 30 (seconds) +1ms
  mqttjs:client _sendPacket :: (W-Auto-vFOIZbG-identifier) ::  start +1ms
  mqttjs:client storeAndSend :: store packet with cmd publish to outgoingStore +0ms
  mqttjs:client nop :: undefined +0ms
  mqttjs:client sendPacket :: packet: {
  mqttjs:client   cmd: 'publish',
  mqttjs:client   topic: 'xx_BAD_TOPIC',
  mqttjs:client   payload: <Buffer 7b 0d 0a 20 20 20 20 22 66 6f 6f 2d 6d 65 73 73 61 67 65 22 3a 20 20 7b 0d 0a 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 ... 348 more bytes>,
  mqttjs:client   qos: 1,
  mqttjs:client   retain: undefined,
  mqttjs:client   messageId: 8198,
  mqttjs:client   dup: undefined
  mqttjs:client } +1ms
  mqttjs:client sendPacket :: emitting `packetsend` +1ms
  mqttjs:client sendPacket :: writing to stream +0ms
  mqttjs:client sendPacket :: writeToStream result true +1ms
  mqttjs:client sendPacket :: invoking cb +0ms
  mqttjs:client nop :: undefined +0ms
  mqttjs:client _sendPacket :: (W-Auto-vFOIZbG-identifier) ::  end +1ms
ysfscream commented 1 month ago

Thank you for your feedback. We have indeed reproduced this issue and found that we did not handle the reconnect in the pub command. We will optimize this in the next version.

ysfscream commented 4 weeks ago

Updated to https://github.com/emqx/MQTTX/releases/tag/v1.10.0. Thanks for your feedback.