hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
852 stars 158 forks source link

How to see the full error returned from the broker? #642

Closed leomuko closed 1 week ago

leomuko commented 2 weeks ago

I am experiencing frequent disconnections from the MQTT broker, where the server disconnects the client after a few seconds of being connected. I am trying to debug this issue, but I'm only able to see limited information in the disconnection event.

Currently, I'm using the following code in the .addDisconnectedListener:
.addDisconnectedListener {
    Timber.tag(TAG).e("Hive Mqtt Disconnection")
    Timber.tag(TAG).e(it.cause)
    Timber.tag(TAG).e(it.source.name)
    Timber.tag(TAG).d(it.toString())
}

However, besides a cause and source name, no extra reason is provided as to why the server has been disconnected. I am able to This provides a cause and source name, but no additional reason or detailed error code is returned to explain why the server disconnected the client.

My Setup:

I can publish messages and intermittently receive messages when connected, but the server repeatedly disconnects the client without a clear reason. Without specific reason codes or error messages, it’s challenging to diagnose and resolve the disconnections.

Initialization Code:

fun initializeClient(serverUrl: String, cfPubSubToken: String, clientId: String): Mqtt5BlockingClient {
    client = Mqtt5Client.builder()
        .identifier(clientId)
        .serverHost(serverUrl)
        .serverPort(8883)
        .sslWithDefaultConfig()
        .simpleAuth()
        .password(cfPubSubToken.toByteArray())
        .applySimpleAuth()
        .addConnectedListener {
            Timber.tag(TAG).d("Hive Mqtt Connected")
            Timber.tag(TAG).d("Connected Listener ${it.clientConfig.state}")
        }
        .addDisconnectedListener {
            Timber.tag(TAG).e("Hive Mqtt Disconnection")
            Timber.tag(TAG).e(it.cause)
            Timber.tag(TAG).e(it.source.name)
            Timber.tag(TAG).d(it.toString())
        }
        .buildBlocking() // Return an async client instance

    return client as Mqtt5BlockingClient
}

Connection Code:

suspend fun connectClient(client: Mqtt5BlockingClient): String = withContext(Dispatchers.IO) {
    try {
        // Check if the client is already connected
        if (client.state.isConnected) {
            Timber.tag(TAG).d("Client is already connected. No reconnection attempt needed.")
            return@withContext HiveConnectLocalState.ALREADY_CONNECTED.name
        }

        val connAck: Mqtt5ConnAck = suspendCancellableCoroutine { continuation ->
            val connectFuture = client.connectWith()
                .keepAlive(10)
                .sessionExpiryInterval(86400)
                .cleanStart(false) // Do not start a clean session
                .send()

            Timber.tag(TAG).d(connectFuture.toString())
            continuation.resume(connectFuture)
        }

        // Log connection success
        HiveConnectLocalState.SUCCESS.name
    } catch (e: Mqtt5ConnAckException) {
        Timber.tag(TAG).e("Connack exception: ${e.message}")
        HiveConnectLocalState.INVALID_TOKEN.name
    } catch (e: Exception) {
        Timber.tag(TAG).e(e, "Connection error")
        HiveConnectLocalState.UNKNOWN_ERROR.name
    }
}

Subscription Code:

suspend fun subscribeToHiveMqttTopic(
    topic: String
): Boolean = withContext(Dispatchers.IO) {
    suspendCancellableCoroutine { continuation ->
        try {
            if (client.state.isConnectedOrReconnect) {
                val subAck = client.toAsync().subscribeWith()
                    .topicFilter(topic)
                    .qos(MqttQos.AT_MOST_ONCE)
                    .callback { publish: Mqtt5Publish ->
                        Timber.tag(TAG).d("Callback triggered for topic: ${publish.topic}")
                        publish.payload.ifPresent {
                            val message = StandardCharsets.UTF_8.decode(it.asReadOnlyBuffer()).toString()
                            Timber.tag(TAG).d("Received message: $message")
                        }
                    }
                    .send()
                    .whenComplete { subAck, error ->
                        if (error != null) {
                            // Handle subscription error
                            println("Subscription failed: $error")
                            Timber.tag(TAG).d("Subscription failed: $error")
                            Timber.tag(TAG).e(error)
                            if (!continuation.isCompleted) continuation.resume(false)
                        } else {
                            // Handle successful subscription
                            Timber.tag(TAG)
                                .d("Subscribed to topic: $topic with reason codes: $subAck")
                            if (!continuation.isCompleted) continuation.resume(true)
                        }
                    }.join()

                Timber.tag(TAG).d("Connection Sub Ack is $subAck")
            } else {
                Timber.tag(TAG).d("Subscription failed: Client is not connected")
                continuation.resume(false)
            }
        } catch (e: Exception) {
            Timber.tag(TAG).d("Subscription failed: ${e.message}")
            Timber.tag(TAG).e(e)
            continuation.resume(false)
        }
    }
}

Publish Message Code:

suspend fun publishMessage(
    topic: String,
    message: String,
    qos: MqttQos = MqttQos.AT_MOST_ONCE // Setting QoS to 0
): Boolean = withContext(Dispatchers.IO) {
    try {
        // Construct the MQTT publish message
        val publishMessage = Mqtt5Publish.builder()
            .topic(topic)
            .qos(qos) // QoS is set to 0 (AT_MOST_ONCE)
            .payload(message.toByteArray(StandardCharsets.UTF_8))
            .build()

        // Publish the message asynchronously
        val publishResult: Mqtt5PublishResult = suspendCancellableCoroutine { continuation ->
            val publishFuture = client.toAsync().publish(publishMessage)

            // Handle the publish result
            publishFuture.whenComplete { result, throwable ->
                if (throwable != null) {
                    // Publish failed
                    Timber.tag(TAG).e(throwable, "Publish failed for topic: $topic")
                    continuation.resumeWith(Result.failure(throwable))
                } else {
                    // Publish succeeded
                    Timber.tag(TAG).d("Publish succeeded for topic: $topic, result: $result")
                    continuation.resumeWith(Result.success(result))
                }
            }

            // Handle cancellation
            continuation.invokeOnCancellation {
                Timber.tag(TAG).e("Publish Cancellation")
                publishFuture.cancel(true)
            }
        }

        Timber.tag(TAG).d("Publish Result is $publishResult")
        // If publish was successful, return true
        true
    } catch (e: Exception) {
        // Log the exception
        Timber.tag(TAG).e(e, "Error during publish to topic: $topic")
        false
    }
}

Request for Guidance: Could you please guide me on how to obtain the full errors or reason codes provided by the broker when the server disconnects the client? I am not expecting detailed Hive MQTT logs but am specifically looking to capture and understand the error or reason codes from the broker itself. Any advice or additional debugging steps would be greatly appreciated.

pglombardo commented 1 week ago

Hi @leomuko,

If you have the MQTT Disconnect reason code, the definitions are here.

If you are using the HiveMQ broker, you also have the option of running a Trace Recording to see the exact behavior and cause.

Questions:

  1. Which broker are you using?
  2. What is the output from this code?
Timber.tag(TAG).e(it.cause)
Timber.tag(TAG).e(it.source.name)
Timber.tag(TAG).d(it.toString())
leomuko commented 1 week ago

Hello @pglombardo,

I am using the Cloudflare Pub/Sub service (https://developers.cloudflare.com/pub-sub/).

The output from the code is as follows:

Timber.tag(TAG).e(it.cause) outputs:

com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5DisconnectException: Server sent DISCONNECT.

Timber.tag(TAG).e(it.source.name) outputs: SERVER Let me know if you need any additional details!

pglombardo commented 1 week ago

Ok thanks. You can use this code to get the Disconnect packet details. It should output the DISCONNECT packet reason code.

Alternatively, if Cloudflare doesn't offer any type of logging visibility, another option may be to use Wireshark to monitor the MQTT traffic - watch for the DISCONNECT packets and then find the reason codes there.

leomuko commented 1 week ago

Thanks, this has been helpful