hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
832 stars 153 forks source link

MQTT client receives same message multiple times == number of times it got reconnected successfully. #552

Closed vshirol closed 1 year ago

vshirol commented 1 year ago

Expected behavior

We expect MQTT client should receive only one message upon reconnect and subscribe.

Actual behavior

But client receives same message multiple times == number of times it got reconnected successfully.

To Reproduce

Below is sample client code to connect and subscribe. To reproduce after client connected successfully, stop the VMQ broker and then we see client tries to reconnect and start the VMQ broker after few mins/seconds then eventually client connects back to broker and subscribes to topic. Now publish msg only once to topic to which our client subscribed, in my tests have used mosquito_pub client as its simple to use. Once after publishing msg we see, message received multiple times == number of times it got reconnected successfully.

If you stop the client and restart, then it will be good. But issue repeats if it gets disconnected and reconnected again. Below is console output from the code, after receiving message:

Thread[RxComputationThreadPool-1,5,main] : SUBSCRIBE(): received - testMsg Thread[RxComputationThreadPool-3,5,main] : SUBSCRIBE(): received - testMsg

Steps

Explained above

Reproducer code

public void connect(ConnectInfo connectInfo) {

        Mqtt5UserPropertiesBuilder userPropertiesBuilder = Mqtt5UserProperties.builder();
        userPropertiesBuilder.add("usp-endpoint-id", connectInfo.getEndpointID());

        Mqtt5Connect mqtt5Connect = Mqtt5Connect.builder()
                .cleanStart(false)
                .keepAlive(Mqtt5Connect.DEFAULT_KEEP_ALIVE) // in seconds
                .simpleAuth()
                .username(connectInfo.getUsername()).password(connectInfo.getPassword().getBytes())
                .applySimpleAuth()
                .restrictions()
                .requestResponseInformation(true)
                .applyRestrictions()
                .build();
        mqtt5AsyncClient = Mqtt5Client.builder()
                .identifier(connectInfo.getEndpointID())
                .serverHost(connectInfo.getAddress())
                .serverPort(connectInfo.getPort())
                .addDisconnectedListener(mqttClientDisconnectedContext -> {
                    if(mqttClientDisconnectedContext.getCause() instanceof ConnectionFailedException) {
                        ConnectionFailedException connectionFailedException = (ConnectionFailedException) mqttClientDisconnectedContext.getCause();
                        System.out.println(System.currentTimeMillis()+" : CONNECT(): retry - failed to connect: " + connectInfo.getEndpointID() + " reason -" + connectionFailedException.getMessage());
                    } else if(mqttClientDisconnectedContext.getCause() instanceof Mqtt5ConnAckException){
                        Mqtt5ConnAckException mqtt5ConnAckException = (Mqtt5ConnAckException) mqttClientDisconnectedContext.getCause();
                        System.out.println(System.currentTimeMillis()+" : CONNECT(): retry - failed to connect: " + connectInfo.getEndpointID() + " reason -" + mqtt5ConnAckException.getMqttMessage().getReasonCode());
                    }
                    final Mqtt5ClientDisconnectedContext context = (Mqtt5ClientDisconnectedContext) mqttClientDisconnectedContext;

                    context.getReconnector()
                            .reconnect(true)
                            .delay(5000 * context.getReconnector().getAttempts(), TimeUnit.MILLISECONDS)
                            .connect(mqtt5Connect);

                })
                .addConnectedListener(mqttClientConnectedContext -> {
                    Mqtt5ClientConnectedContext context = (Mqtt5ClientConnectedContext) mqttClientConnectedContext;
                    System.out.println("CONNECT(): ConnectedListener - successfully connected clientId {} and reason -"+connectInfo.getEndpointID());
                    SUBSCRIBE(connectInfo.getSubscribeTo());
                    //handleUserPropAndSubscribe(context.getConnAck().getUserProperties());
                })
                .buildAsync();

        mqtt5AsyncClient.connect(mqtt5Connect);

    }

    private void SUBSCRIBE(String topic) {

        System.out.println("SUBSCRIBE(): on-topic -"+topic);
        mqtt5AsyncClient.subscribeWith().topicFilter(topic).qos(MqttQos.AT_LEAST_ONCE).noLocal(true).callback(mqtt5Publish -> {
            if (mqtt5Publish != null) {
                System.out.println( Thread.currentThread().toString() + " : SUBSCRIBE(): received - "+new String(mqtt5Publish.getPayloadAsBytes()));

            }
        }).send().whenComplete((mqtt5SubAck, throwable) -> {
            if (throwable != null)
                System.err.println("couldn't subscribe to error: -"+throwable);
            else
                System.out.println("successfully subscribed to topic -"+topic);
        });

    }

Details

pglombardo commented 1 year ago

Hi @vshirol - thanks for pointing this out. To be thorough could you also include the mosquito_pub command line that you used?

vshirol commented 1 year ago

Hi @vshirol - thanks for pointing this out. To be thorough could you also include the mosquito_pub command line that you used?

Hi @pglombardo, below is the command for publishing message from mosquitto_pub mosquitto_pub -h 127.0.0.1 -p 1893 -u username -P password -t test/duplicate/msg/ --protocol-version mqttv5 --id mosq_client-1 -m testMsg

Steps:

  1. Start Mqtt broker
  2. Use attached java class(zip) which will create mqtt client connecting to broker and subscribe to topic.
  3. restart mqtt broker
  4. client will reattempt and eventually connects back successfully to broker and subscribes.
  5. Now by using mosquitto_pub command, publish a message (refer above sample command)
  6. On console of java program, we will see multiple msg (2 in this case).
  7. Repeat from step 3 again, to see how many times it receives same message == number of times it has reconnected successfully MqttClientDuplicateMsg.zip

Let me know if you need any other details from me.

pglombardo commented 1 year ago

Perfect/thanks @vshirol. I'm traveling for the next few days but I'll try to get in some time to try this out myself soon.

In the meantime you could try this out against HiveMQ cloud to potentially exclude the 'VMQ' (assuming this means VerneMQ) broker from being the issue. If you don't have time, I'll do this myself once I get back to my laptop.

I'll post back soon.

vshirol commented 1 year ago

Perfect/thanks @vshirol. I'm traveling for the next few days but I'll try to get in some time to try this out myself soon.

In the meantime you could try this out against HiveMQ cloud to potentially exclude the 'VMQ' (assuming this means VerneMQ) broker from being the issue. If you don't have time, I'll do this myself once I get back to my laptop.

I'll post back soon.

Yes, VMQ refers to VerneMQ here. We can rule out broker role in this issue, as we captured packets on broker port and analyzed. Let me know if you want, we can share the same.

pglombardo commented 1 year ago

Hi @vshirol - the issue seems to be that on each reconnect, the code is adding a new callback:

 mqtt5AsyncClient.subscribeWith().topicFilter<...snip...>.callback(mqtt5Publish -> {
    if (mqtt5Publish != null) {
        System.out.println(...);
    }

So when a message is received, it's delivered to each callback (which is equal to the number of reconnects).

One option to fix is this to check the session state on reconnect with mqttClientConnectedContext.getConnAck().isSessionPresent().

If the session is already present, no need to call subscribe again...

vshirol commented 1 year ago

Hi @vshirol - the issue seems to be that on each reconnect, the code is adding a new callback:

 mqtt5AsyncClient.subscribeWith().topicFilter<...snip...>.callback(mqtt5Publish -> {
    if (mqtt5Publish != null) {
        System.out.println(...);
    }

So when a message is received, it's delivered to each callback (which is equal to the number of reconnects).

One option to fix is this to check the session state on reconnect with mqttClientConnectedContext.getConnAck().isSessionPresent().

If the session is already present, no need to call subscribe again...

Thanks @pglombardo. I will try with the proposed solution and let you know.

pglombardo commented 1 year ago

Hi @vshirol - any updates on this?

pglombardo commented 1 year ago

Hi @vshirol - since I haven't heard back I'll close out this issue. But if you're still having issues, let me know. We can re-open if needed. Best!

vshirol commented 1 year ago

threadump_before_conn_reset.zip threadump_after_2times_conn_reset.zip Hi @pglombardo The above-mentioned issue is solved with your proposed code change. Thanks. But we do see similar problem in different scenario, that is when MQTT broker terminates client connection by sending Disconnect MQTT packet to client or when connection reset occurs on client side, in this case MQTT broker is still running and not restarted.

So, in these scenarios with above mentioned code change doesn't help, client receives the duplicate message which is equal to number of times its connection reset either by broker or system error or client-side network issues. Let me know if you need any more details from my side.

From thread dump we do see, upon every connection reset there is an increase in RxComputationThreadPool instance. Attaching thread dump of both normal and after 2 times connection

Thanks, Vasant