eclipse / paho.mqtt.java

Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.
https://eclipse.org/paho
Other
2.12k stars 883 forks source link

Paho (Java) does not specify topic when re-publishing a message due to missing ACK #916

Open StefanoBerlato opened 2 years ago

StefanoBerlato commented 2 years ago

Hi! I am using Paho (Java) 1.2.5 as an MQTT client to communicate with Mosquitto 2.0.14 on Ubuntu 20.04.4. I am experiencing an issue when Paho has to re-publish a message due to the lack of an ACK from the broker after having been disconnected. More specifically, when I send a specific MQTT message with QoS 1/2 to the topic related to the DynSec plugin of Mosquitto, the broker disconnects the client without sending an ACK for the message (please refer to this issue for further information on why this happens).

The (potential) issue I would like to report does not involve the fact that Mosquitto disconnects the client without sending an ACK (obviously), but instead it is related to how Paho (Java) handles this situation. In detail, in my scenario:

  1. Paho connects to Mosquitto with the cleanStart flag to false (to retain the session) and automaticReconnect to true;
  2. Paho publishes a message to the DynSec topic with QoS 1/2, receiving a disconnect packet instead of an ACK (again, this is probably an issue/intended behaviour of Mosquitto, as discussed in the other issue);
  3. Paho automatically reconnects to Mosquitto;
  4. Paho tries to re-publish the message (since it did not receive the ACK). However, the message causes Mosquitto to reply with a disconnect packet with reason code 130 (i.e., protocol error).

At this point, it seems that either Paho (Java) or Mosquitto is doing something wrong. However, by investigating the packets flow with Wireshark, I noticed that the re-published message in step 4 by Paho does not include the topic name (i.e., the topic length is 0). Most probably, this is why Mosquitto replies with reason code 130.

Wireshark2

I highlight that this problem does not happen with Paho (Python), as the re-published message contains the topic name (as shown in the other issue). Therefore, it seems to be an issue of Paho (Java).

You can find in this .zip file the configuration of Mosquitto and DynSec, the dump of Wireshark packets and a mock Kotlin code of the MQTT client to reproduce the behaviour. For your convenience, I report also below the log of the Mosquitto broker, the mosquitto.conf file content, the dynamic-security.json file content and the Kotlin code.

Thanks in advance! Stefano




The Mosquitto broker log:

- 1647012876: mosquitto version 2.0.14 starting
- 1647012876: Config loaded from /mosquitto/config/mosquitto2.conf.
- 1647012876: Loading plugin: ./usr/lib/mosquitto_dynamic_security.so
- 1647012876: Opening ipv4 listen socket on port 1883.
- 1647012876: Opening ipv6 listen socket on port 1883.
- 1647012876: mosquitto version 2.0.14 running
- 1647012887: New connection from 10.1.0.1:56788 on port 1883.
- 1647012887: New client connected from 10.1.0.1:56788 as randomClientID (p5, c0, k120, u'admin').
- 1647012887: No will message specified.
- 1647012887: Sending CONNACK to randomClientID (0, 0)
- 1647012888: Received PUBLISH from randomClientID (d0, q1, r0, m1, '$CONTROL/dynamic-security/v1', ... (178 bytes))
- 1647012888: dynsec: randomClientID/admin | createRole | rolename=roleName1
- 1647012888: Sending PUBACK to randomClientID (m1, rc0)
- 1647012889: Received PUBLISH from randomClientID (d0, q1, r0, m2, '$CONTROL/dynamic-security/v1', ... (222 bytes))
- 1647012889: Client randomClientID been disconnected by administrative action.
- 1647012889: dynsec: (null)/admin | addClientRole | username=admin | rolename=roleName1 | priority=-1
- 1647012889: Sending PUBACK to null (m2, rc0)
- 1647012889: Client <unknown> disconnected due to out of memory.
- 1647012890: New connection from 10.1.0.1:56790 on port 1883.
- 1647012891: New client connected from 10.1.0.1:56790 as randomClientID (p5, c0, k120, u'admin').
- 1647012891: No will message specified.
- 1647012891: Sending CONNACK to randomClientID (0, 0)
- 1647012891: Client randomClientID disconnected due to protocol error.

The mosquitto.conf file content:

log_type all

per_listener_settings false

# HTTP
listener 1883
plugin ./usr/lib/mosquitto_dynamic_security.so
plugin_opt_config_file /mosquitto/config/dynamic-security.json
persistent_client_expiration 1d

The dynamic-security.json file content:

{
    "clients":  [{
            "username": "admin",
            "textName": "Dynsec admin user",
            "password": "o5Ves8UtkxWh+ELZ5O8L3BCRe1tHJBE+capQBZfoXpNo6ohyaPxjpPz2wX3VvuJbKZxVDyrGogqGKO++fdayMg==",
            "salt": "Rl3CKxTVy8M+63sA",
            "iterations":   101,
            "roles":    [{
                    "rolename": "admin"
                }]
        }],
    "roles":    [{
            "rolename": "admin",
            "acls": [{
                    "acltype":  "publishClientSend",
                    "topic":    "$CONTROL/#",
                    "allow":    true
                }, {
                    "acltype":  "publishClientReceive",
                    "topic":    "$CONTROL/#",
                    "allow":    true
                }, {
                    "acltype":  "subscribePattern",
                    "topic":    "$CONTROL/#",
                    "allow":    true
                }, {
                    "acltype":  "publishClientReceive",
                    "topic":    "$SYS/#",
                    "allow":    true
                }, {
                    "acltype":  "subscribePattern",
                    "topic":    "$SYS/#",
                    "allow":    true
                }, {
                    "acltype":  "publishClientSend",
                    "topic":    "#",
                    "allow":    true
                }, {
                    "acltype":  "publishClientReceive",
                    "topic":    "#",
                    "allow":    true
                }, {
                    "acltype":  "subscribePattern",
                    "topic":    "#",
                    "allow":    true
                }, {
                    "acltype":  "unsubscribePattern",
                    "topic":    "#",
                    "allow":    true
                }]
        }],
    "defaultACLAccess": {
        "publishClientSend":    false,
        "publishClientReceive": true,
        "subscribe":    false,
        "unsubscribe":  true
    }
}

The Java code:

package eu.fbk.st.cryptoac.implementation.dm

import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.eclipse.paho.mqttv5.client.*
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence
import org.eclipse.paho.mqttv5.common.MqttException
import org.eclipse.paho.mqttv5.common.MqttMessage
import org.eclipse.paho.mqttv5.common.packet.MqttProperties

fun main() {
    TestClass().simulateScenario()
}

class TestClass : MqttCallback {

    fun simulateScenario() {

        /**
         * The "dynamic-security.json" file creates a
         * user with name "admin" and password "password"
         */

        val connOpts = MqttConnectionOptions()
        connOpts.isCleanStart = false
        connOpts.keepAliveInterval = 120
        connOpts.isAutomaticReconnect = true
        connOpts.userName = "admin"
        connOpts.password = "password".toByteArray()
        val client = MqttClient(
            "tcp://10.1.0.8:1883",
            "randomClientID",
            MemoryPersistence()
        )
        client.connect(connOpts)
        client.setCallback(this)

        runBlocking { delay(1000) }

        /** 1. Create a new role, everything is fine */
        val createRole = """
            { "commands": [
                {
                    "command": "createRole",
                    "rolename": "roleName1"
                }
            ]}
         """
        val createRoleMessage = MqttMessage(createRole.toByteArray())
        createRoleMessage.qos = 1
        client.publish(
            "\$CONTROL/dynamic-security/v1",
            createRoleMessage
        )

        runBlocking { delay(1000) }

        /** 2. Assign the user admin to the new role => Disconnect 152 */
        val assignClientToRole = """
            { "commands": [
                {
                    "command": "addClientRole",
                    "username": "admin",
                    "rolename": "roleName1"
                }
            ]}
         """
        val assignMessage = MqttMessage(assignClientToRole.toByteArray())
        assignMessage.qos = 1
        try {
            client.publish(
                "\$CONTROL/dynamic-security/v1",
                assignMessage
            )
        } catch (e: MqttException) {
            if (e.message?.contains("Disconnect RC: 152") == true) {
                println("We were disconnected by Administrative action (see below stack trace)")
                e.printStackTrace()
                println(
                    "Now the client automatically reconnects and tries to re-publish the second \n" +
                    "message, as the broker directly sent a disconnect request and not the ACK. \n" +
                    "However, the client sends a message that triggers a 'Disconnect RC: 130' \n" +
                    "response from the broker (probably due to the missing topic in the message)"
                )
                runBlocking { delay(1000) }
            } else {
                throw e
            }
        }
    }

    override fun messageArrived(topic: String, message: MqttMessage) {
        println("TEST: MQTT message arrived, topic $topic, payload ${message.payload}")
    }

    override fun disconnected(disconnectResponse: MqttDisconnectResponse?) {
        println("MQTT client was disconnected: ${disconnectResponse.toString()}")
    }

    override fun authPacketArrived(reasonCode: Int, properties: MqttProperties?) {
        println("TEST: authPacketArrived")
    }

    override fun connectComplete(reconnect: Boolean, serverURI: String?) {
        println("TEST: connectComplete")
    }

    override fun deliveryComplete(token: IMqttToken?) {
        println("TEST: deliveryComplete")
    }

    override fun mqttErrorOccurred(exception: MqttException?) {
        println("TEST: mqttErrorOccurred")
    }
}