hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
854 stars 158 forks source link

CompletableFuture returned from subscribe on a CONNECTING async client never completes if the client does not connect #612

Open jfontsaballs opened 8 months ago

jfontsaballs commented 8 months ago

🐛 Bug Report

CompletableFuture returned from subscribe on a CONNECTING async client never completes if the client does not connect. Also, after this the client can not be stopped and reconnection seems to stop happening.

🔬 How To Reproduce

Please see the code sample below, executed without a broker. For this behavior to occur the client never connects to the broker.

A similarly suspicious behavior happens if you call subscribe while the client is reconnecting, although I have not been able to analyze it properly.

Code sample

Kotiln:

import com.hivemq.client.mqtt.lifecycle.MqttClientAutoReconnect
import java.util.UUID
import java.util.concurrent.TimeUnit

fun main() {
    var shouldStop = false
    val client = com.hivemq.client.mqtt.MqttClient.builder()
        .identifier(UUID.randomUUID().toString())
        .serverHost("localhost")
        .automaticReconnect(
            MqttClientAutoReconnect.builder()
                .initialDelay(500, TimeUnit.MILLISECONDS)
                .maxDelay(5000, TimeUnit.MILLISECONDS)
                .build()
        )
        .addDisconnectedListener { context ->
            println("Disconnected")
            if (shouldStop)
                context.reconnector.reconnect(false)
        }
        .useMqttVersion5()
        .buildAsync()

    // Start connection but don't wait for it to complete
    val connectionFuture = client.connectWith()
        .cleanStart(true)
        .keepAlive(10 /*seconds*/)
        .send()

    println(client.state) // CONNECTING

    // Publish, it throws as expected
    try {
        client.publishWith()
            .topic("something")
            .payload("whatever".toByteArray())
            .send().join()
    } catch (e: Throwable) {
        println(e) //com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected.
    }

    val subscribeFuture = client.subscribeWith()
        .topicFilter("something")
        .send()
    Thread.sleep(5000)
    //subscribeFuture.join() !! Never completes
    println(subscribeFuture)

    // Then I tried setting a timeout
    try {
        client.subscribeWith()
            .topicFilter("something")
            .send()
            .orTimeout(2, TimeUnit.SECONDS)
            .join()
    } catch (e: Throwable) {
        println(e)
    }

    shouldStop = true
    connectionFuture.cancel(true)
    try {
        client.disconnect().join()
    } catch (e: Throwable) {
        println(e)
    }
    println("END")
    // After this, application never stops neither it tries to reconnect with the broker
}

Output

CONNECTING
java.util.concurrent.CompletionException: com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected.
Disconnected
Disconnected
Disconnected
com.hivemq.client.internal.rx.RxFutureConverter$RxSingleFuture@238d68ff[Not completed]
Disconnected
java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException
java.util.concurrent.CompletionException: com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected.
END

Environment

Where are you running/using this client?

Hardware or Device? Laptop with Intel i7 Processor

What version of this client are you using? 1.3.0

JVM version? Java 21, Gradle 8.5

Operating System? Windows 10

Which MQTT protocol version is being used? 5

Which MQTT broker (name and version)? None

📈 Expected behavior

Subscribe throws if it can't get through, similar to publish

📎 Additional context

Bug found while analyzing the behavior of the client under communication difficulties with the broker.