hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
847 stars 158 forks source link

Java Hive Client does not receive all the messages on session reconnect #527

Closed manojrawat650 closed 1 year ago

manojrawat650 commented 2 years ago

Expected behavior

Hive mqtt client should receive all the messages(that were published to the broker while the consumer was offline) on reconnection with clean_session=false with no session expiry. The client is build in java using hive mqtt client version 1.3.0. The broker is hivemq version 4.7.6

Actual behavior

On reconnection, the client does not receive messages published by all the publishers. for example: If 4 different publishers sent 12 messages(3 messages each), on reconnection only 9 messages will be received. 3 messages sent by one of the publishers is not received.

To Reproduce

Start hivemq broker using the below docker command: docker run -p 8080:8080 -p 1883:1883 hivemq/hivemq4 Start the hive client java application. Client subscribes on qos2. Publishers are publishing on qos1(also tested with qos2).

Steps

  1. Start hivemq client application.
  2. Publish messages from 4 different publishers.
  3. All messages are received and logged.
  4. Now turn off the client application.
  5. Publish messages from only 1 publisher.
  6. Now turn on the client application again. No messages are received.
  7. Turn off the client application again.
  8. Publish messages from all 4 publishers.
  9. Now turn on the client application again. All the messages are received and logged except messages from one of the publishers.

Reproducer code

Client code: //Create the client Mqtt5AsyncClient subClient = Mqtt5Client.builder().identifier("mqttConsumer") .serverHost("localhost") .serverPort(1883) .automaticReconnect( MqttClientAutoReconnect.builder() .initialDelay(3000, TimeUnit.MILLISECONDS) .maxDelay(10000, TimeUnit.MILLISECONDS).build()) .buildAsync();

//Connect the client subClient.connectWith() .cleanStart(false) .sessionExpiryInterval(3652460*60l) .send() .get(10000, TimeUnit.MILLISECONDS);

//Subscribe to the topic subClient.toAsync() .subscribeWith() .topicFilter("topic1") .qos(MqttQos.EXACTLY_ONCE) .callback(mqtt5Publish -> doSomething(mqtt5Publish)) .send();

//print the received message on callback private static void doSomething(Mqtt5Publish mqtt5Publish) { System.out.println(StandardCharsets.UTF_8.decode(mqtt5Publish.getPayload().get()).toString()); }

Details

rohit5ram commented 2 years ago

I face the same issue on 1.3.0 I downgraded to 1.2.2, and it works as expected.

manojrawat650 commented 2 years ago

I downgraded to 1.2.2, and it works as expected

@rohit5ram Thanks, I tried 1.2.2 and it works fine.

pglombardo commented 1 year ago

Hi @rohit5ram @manojrawat650 - thanks for pointing this out and apologies for the late response. I'm trying to replicate what you reported here. I'll report back soon once I have something.

pglombardo commented 1 year ago

After some digging, I was able to reproduce this with both 1.3.0 and 1.2.2. The actual cause is because the broker is delivering the QoS 1/2 messages before the subscribe call is executed which results in messages being dropped on the floor.

To fix this, you can preconfigure the subClient to handle messages before sending the connect (and subscribe).

subClient.toAsync().publishes(MqttGlobalPublishFilter.SUBSCRIBED, mqtt5Publish -> {
            System.out.println("received message: " + mqtt5Publish);
    });

Take a look at the result:

client-preconfiguration

So with QoS 1 & 2, some special handling has to be done to guarantee that messages are correctly consumed.

This could be documented better which I'll look to do sometime soon.

Could you let me know if this is still an issue or if this helps at all?

SgtSilvio commented 1 year ago

So with QoS 1 & 2, some special handling has to be done to guarantee that messages are correctly consumed.

Special handling is actually not required. Instead just call subscribe before connect:

client.toAsync().subscribeWith()
    .subscribeWith()
    .topicFilter("topic1")
    .callback(publish -> doSomething(publish))
    .send();
// do not block on the future here
client.toBlocking().connectWith()
    .cleanStart(false)
    .sessionExpiryInterval(3652460*60l)
    .send();
// here you could block on the future returned by subscribe
pglombardo commented 1 year ago

Considering the age of this issue and two solutions are now provided, I'll close out this issue.

I will look to possibly add a "Best Practices" section to the documentation soon that will cover the resubscribe topic.

If any issues remain, feel free to open another issue anytime or contact us in our community forum.