hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
847 stars 158 forks source link

Java Hive Client does not receive messages on reconnect #528

Closed manojrawat650 closed 1 year ago

manojrawat650 commented 2 years ago

Expected behavior

Hive mqtt client should receive all the messages(that were published to the broker while the consumer was offline) on reconnection with clean_session=false with no session expiry. The client is build in java using hive mqtt client version 1.3.0. The broker is emqx version 4.4.3.

Actual behavior

On reconnection, the client does not receive messages published by the publishers. for example: If publishers sent n messages while client was offline, on reconnection no messages are received.

To Reproduce

Start emqx broker using the below docker command: docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:4.4.3 Start the hive client java application. Client subscribes on qos2. Publishers are publishing on qos1(also tested with qos2).

Steps

Start hivemq client application. Publish messages from 4 different publishers. All messages are received and logged. Now turn off the client application. Publish messages from all the publishers again. Now turn on the client application again. No messages are received.

Reproducer code

Client code: //Create the client Mqtt5AsyncClient subClient = Mqtt5Client.builder().identifier("mqttConsumer") .serverHost("localhost") .serverPort(1883) .automaticReconnect( MqttClientAutoReconnect.builder() .initialDelay(3000, TimeUnit.MILLISECONDS) .maxDelay(10000, TimeUnit.MILLISECONDS).build()) .buildAsync();

//Connect the client subClient.connectWith() .cleanStart(false) .sessionExpiryInterval(3652460*60l) .send() .get(10000, TimeUnit.MILLISECONDS);

//Subscribe to the topic subClient.toAsync() .subscribeWith() .topicFilter("topic1") .qos(MqttQos.EXACTLY_ONCE) .callback(mqtt5Publish -> doSomething(mqtt5Publish)) .send();

//print the received message on callback private static void doSomething(Mqtt5Publish mqtt5Publish) { System.out.println(StandardCharsets.UTF_8.decode(mqtt5Publish.getPayload().get()).toString()); }

Details

pglombardo commented 1 year ago

Hi @manojrawat650,

There are a few variables that can cause this:

Now turn off the client application.

If you are calling disconnect on shutdown, missed messages won't be received.

.sessionExpiryInterval(3652460*60l)

UINT_MAX == session does not expire; 0 == session expires when network connection is closed

subClient.connectWith()

To diagnose, you can check the return value for sessionPresent. This will tell you if the session was maintained or lost.

It's been a while since this issue has been filed. Let me know if this is still an issue for you or if the above helps out.

pglombardo commented 1 year ago

Hi @manojrawat650 - since I haven't heard back I'll close out this issue. If it still exists for you, let us know. We'd love to help out. If this is the case, feel free to re-open or file another issue. Happy MQTT'ing!