hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
855 stars 158 forks source link

Message callback is not called anymore after doing disconnect + connect for an existing client instance #434

Closed larsgx closed 4 years ago

larsgx commented 4 years ago

Hi, for a fault tolerance scenario, we want to ensure, that MQTT messages are re-delivered in case the application is temporarily not able to handle (store) a received message. For that we want to use manual acknowledgements and reconnect the MQTT client after a delay, so that messages are re-delivered until the consuming application can store the message (and will acknowledge the message then).

Re-Delivery Scenario and expected Behaviour

Actual Behaviour

subscribeWith is done before connectWith. This ensures, that there is a message handler registered to the MQTT client at the time, the connection is established. And this works fine, when the MQTT client instance is created freshly (e.g. after an application restart). So, if there was a message stored in the persistent broker session, it will be delivered to the client and processed by the message callback handler.

But when calling disconnect followed by connect (with same configs as the initial connect) for an existing client instance, messages are not forwarded to the message handler callback anymore. We can see in the Mosquitto logs, that the messages are published to the client - but not forwarded to the handler anymore. Just for test, we added client.publishes(MqttGlobalPublishFilter.ALL, ...), which also does not show any received message.

Note, we also tested to disconnect the old client instance and then create + connect a completely new client instance. In our target environment we are currently using MQTT3 and Payara 4.1. And this led in some (3/10) cases to a strange behaviour: After some reconnects, the messages were consumed by different threads in parallel (while nettyThreads was fixed to "1"), which led to ordering problems. That was reason to just reconnect an existing client instance.

Reproducer

Start Mosquitto docker run -p 1883:1883 eclipse-mosquitto:1.6.10

public class HiveMqClientReconnectTest {

    public static void main(String[] args) {
        Mqtt5AsyncClient client = MqttClient.builder()
                .identifier("client1")
                .serverHost("localhost")
                .serverPort(1883)
                .useMqttVersion5()
                .executorConfig()
                .nettyThreads(1)
                .applyExecutorConfig()
                .automaticReconnectWithDefaultConfig()
                .buildAsync();

        subscribeAndConnect(client);
    }

    private static void subscribeAndConnect(final Mqtt5AsyncClient client) {
        client
                .subscribeWith()
                .topicFilter("testTopic")
                .qos(AT_LEAST_ONCE)
                .callback(mqtt5Publish -> onMessage(mqtt5Publish, client))
                .manualAcknowledgement(true)
                .send()
                .whenComplete((subAck, throwable) -> {
                    System.out.println("Subscribed: " + subAck + ", throwable: " + throwable);
                });

        client.connectWith()
                .cleanStart(false)
                .noSessionExpiry()
                .restrictions()
                .receiveMaximum(1)
                .applyRestrictions()
                .send()
                .whenComplete((mqtt5ConnAck, throwable) -> {
                    System.out.println("Connected: " + mqtt5ConnAck + ", throwable: " + throwable);
                });
    }

    private static void onMessage(final Mqtt5Publish mqtt5Publish, final Mqtt5AsyncClient client) {
        System.out.println("Received message: " + mqtt5Publish);
        // Just for test: Do not acknowledge the message and reconnect the client delayed, so that messages will be re-delivered
        // Note: Reconnect could be in the same or in a separate thread -> same result
        new Thread(() -> {
            sleep(5000);
            client
                    .disconnect()
                    .whenComplete((aVoid, throwable) -> subscribeAndConnect(client));
        }).start();
    }

    private static void sleep(long duration) {
        try {
            Thread.sleep(duration);
        } catch (final InterruptedException interruptedException) {} // just for test
    }
}

MQTT Client Logs

Connected: MqttConnAck{reasonCode=SUCCESS, sessionPresent=false, restrictions=MqttConnAckRestrictions{receiveMaximum=65535, maximumPacketSize=268435460, topicAliasMaximum=10, maximumQos=EXACTLY_ONCE, retainAvailable=true, wildcardSubscriptionAvailable=true, sharedSubscriptionAvailable=true, subscriptionIdentifiersAvailable=true}}, throwable: null Subscribed: MqttSubAck{reasonCodes=[GRANTED_QOS_1], packetIdentifier=65526}, throwable: null

// message was sent via MQTT publisher tool (QoS1)

Received message: MqttPublish{topic=testTopic, payload=87byte, qos=AT_LEAST_ONCE, retain=false}

// reconnect Connected: MqttConnAck{reasonCode=SUCCESS, sessionPresent=true, restrictions=MqttConnAckRestrictions{receiveMaximum=65535, maximumPacketSize=268435460, topicAliasMaximum=10, maximumQos=EXACTLY_ONCE, retainAvailable=true, wildcardSubscriptionAvailable=true, sharedSubscriptionAvailable=true, subscriptionIdentifiersAvailable=true}}, throwable: null Subscribed: MqttSubAck{reasonCodes=[GRANTED_QOS_1], packetIdentifier=65526}, throwable: null

Mosquitto LogMessages

1596377446: New connection from x.x.0.1 on port 1883. 1596377446: New client connected from x.x.0.1 as client1 (p5, c0, k60). 1596377446: No will message specified. 1596377446: Sending CONNACK to client1 (0, 0) 1596377446: Received SUBSCRIBE from client1 1596377446: testTopic (QoS 1) 1596377446: client1 1 testTopic 1596377446: Sending SUBACK to client1

// test message published 1596377469: Received PUBLISH from MQTT_FX_Client (d0, q1, r0, m1, 'testTopic', ... (87 bytes)) 1596377469: Sending PUBACK to MQTT_FX_Client (m1, rc0)

// initial publish to consumer 1596377469: Sending PUBLISH to client1 (d0, q1, r0, m1, 'testTopic', ... (87 bytes))

// re-connect procedure 1596377474: Received DISCONNECT from client1 1596377474: Client client1 disconnected. 1596377474: New connection from x.x.0.1 on port 1883. 1596377474: New client connected from x.x.0.1 as client1 (p5, c0, k60). 1596377474: No will message specified. 1596377474: Sending CONNACK to client1 (1, 0)

// Mosquitto publish the message again, which is not forwarded to handler callback 1596377474: Sending PUBLISH to client1 (d1, q1, r0, m1, 'testTopic', ... (87 bytes)) 1596377474: Received SUBSCRIBE from client1 1596377474: testTopic (QoS 1) 1596377474: client1 1 testTopic 1596377474: Sending SUBACK to client1

Environment

fraschbi commented 4 years ago

Hi @LarsGielsok,

Thank you for reaching out with your concern and providing an outstandingly well written ticket! I used your reproducer and can tell you that the behaviour is consistent with the MQTT specification. Here is an excerpt from a trace recording I made with HiveMQ.

2020-08-05 15:26:12,637 - [client1] - Received CONNECT message (mqtt version: MQTTv5, client identifier: client1, clean start: false, keep alive: 60, session expiry interval: 4294967295, topic alias maximum: 0, receive maximum: 1, maximum packet size: 268435460, request response information: false, request problem information: true, auth method: none, username: none, password: none) 2020-08-05 15:26:12,675 - [client1] - Sent CONNACK message (session present: false, retain available: true, shared subscriptions available: true, subscription identifier available: true, wildcard subscriptions available: true, maximum packet size: 268435460, server receive maximum: 10, topic alias maximum: 5, reason code: 0, maximum QoS: EXACTLY_ONCE) 2020-08-05 15:26:12,733 - [client1] - Received SUBSCRIBE message (packet identifier: 65526, topics: [topic: testTopic, QoS: 1, retain handling: 0, no local: false, retain as published: false], subscription identifier: 1) 2020-08-05 15:26:12,756 - [client1] - Sent SUBACK message (packet identifier: 65526, reason codes: [1]) 2020-08-05 15:26:50,519 - [client1] - Sent PUBLISH message (packet identifier: 51, topic: testTopic, QoS: 1, retain: false, duplicate delivery: false, subscription identifiers: [1]) 2020-08-05 15:26:55,545 - [client1] - Received DISCONNECT message (reason code: 0) 2020-08-05 15:26:55,568 - [client1] - Received CONNECT message (mqtt version: MQTTv5, client identifier: client1, clean start: false, keep alive: 60, session expiry interval: 4294967295, receive maximum: 1, request response information: false, request problem information: true, auth method: none, username: none, password: none) 2020-08-05 15:26:55,570 - [client1] - Sent CONNACK message (session present: true, retain available: true, shared subscriptions available: true, subscription identifier available: true, wildcard subscriptions available: true, maximum packet size: 268435460, server receive maximum: 10, topic alias maximum: 5, reason code: 0, maximum QoS: EXACTLY_ONCE) 2020-08-05 15:26:55,572 - [client1] - Received SUBSCRIBE message (packet identifier: 65526, topics: [topic: testTopic, QoS: 1, retain handling: 0, no local: false, retain as published: false], subscription identifier: 2) 2020-08-05 15:26:55,573 - [client1] - Sent PUBLISH message (packet identifier: 51, topic: testTopic, QoS: 1, retain: false, duplicate delivery: true, subscription identifiers: [1]) 2020-08-05 15:26:55,577 - [client1] - Sent SUBACK message (packet identifier: 65526, reason codes: [1])

The way this has to be read is from the view of the broker. You can see that the broker sends a PUBLISH and never receives a PUBACK before the client disconnects again. As defined in the MQTT specification as soon as the client re-connects this undelivered PUBLISH is re-sent (with duplicate set to true). As you can see in the CONNECT packet, the receive maximum is set to 1 and therefore HiveMQ must not send any more messages before a PUBACK is received. Due to your implementation the PUBACK is never sent and instead the client keeps disconnecting and subscribing again.

Messages properly queued for the client as soon as the PUBACK is received for the first message the others will be delivered.

I hope this helps with clarification.

Best regards, Florian

larsgx commented 4 years ago

Hi @fraschbi, thanks for the quick feedback. I agree to that, what you are writing. The broker side is fine from my point of view: The broker ist sending PUBLISHs for the not ack'ed messages every time, the client connects. That's fine. That's the same broker behaviour, we are seeing for Mosquitto.

The problem is: The callback method of the mqtt client instance is only called once. Only for the first PUBLISH (d=0) of the message. Then the client is re-connected, the Mosquitto broker publishes the message, but it is never logged in the callback method. Thus, the callback method is never called. You can see this behaviour, since "Received message: MqttPublish" is only logged once. At least when using the HiveMQ client lib with Mosquitto.

So, what I would expect to see on the client side would be:

Connected: MqttConnAck... Subscribed: ... Received message: MqttPublish (d=0)

// first reconnect Connected: MqttConnAck... Subscribed: MqttSubAck... Received message: MqttPublish (d=1)

// second reconnect Connected: MqttConnAck... Subscribed: MqttSubAck... Received message: MqttPublish (d=2)

But what we see is:

Connected: MqttConnAck... Subscribed: ... Received message: MqttPublish (d=0)

// first reconnect Connected: MqttConnAck... Subscribed: MqttSubAck...

// second reconnect Connected: MqttConnAck... Subscribed: MqttSubAck...

Best Regards Lars

SgtSilvio commented 4 years ago

Hi @LarsGielsok

To avoid some misunderstanding, I want to clarify a few things:

// Just for test: Do not acknowledge the message and reconnect the client delayed, so that messages will be re-delivered

Am I correct that when you receive a message that you do not want to process, you never acknowledge the message? When using manualAcknowledgement, you always have to acknowledge every message. The only thing that does not matter is how long you delay the acknowledgement.

But when calling disconnect followed by connect (with same configs as the initial connect) for an existing client instance, messages are not forwarded to the message handler callback anymore.

The cause is then most likely that you never acknowledge the previous message. This causes that the PUBACK is never sent to the broker and the broker finally stops delivering new messages.

for a fault tolerance scenario, we want to ensure, that MQTT messages are re-delivered in case the application is temporarily not able to handle (store) a received message. For that we want to use manual acknowledgements and reconnect the MQTT client after a delay, so that messages are re-delivered until the consuming application can store the message (and will acknowledge the message then).

Am I correct that you want to disconnect - wait - reconnect when you receive too many messages? This is not a great solution in my opinion. Instead you can specify a lower receive maximum in the connect message. This limits the maximum amount of concurrent messages the broker is allowed to send to the client. Delaying the manual acknowledgements also delays sending the PUBACK message, so just delay the manual acknowledgement when "the application is temporarily not able to handle (store) a received message".

I hope this helps solving your issue. Please comment if I misunderstood or missed something.

Best regards Silvio

larsgx commented 4 years ago

Hi @SgtSilvio,

thanks for the reply and sorry for the delay.

// Just for test: Do not acknowledge the message and reconnect the client delayed, so that messages will be re-delivered

Am I correct that when you receive a message that you do not want to process, you never acknowledge the message? When using manualAcknowledgement, you always have to acknowledge every message. The only thing that does not matter is how long you delay the acknowledgement.

But when calling disconnect followed by connect (with same configs as the initial connect) for an existing client instance, messages are not forwarded to the message handler callback anymore.

The cause is then most likely that you never acknowledge the previous message. This causes that the PUBACK is never sent to the broker and the broker finally stops delivering new messages.

No, never acknowledging unwanted messages, that's not our intention. In the real scenario, every message will be ack'ed once it could be processed. The never acknowledging reproducer is really just meant as reproducer to show the core issue: When there is a message, that is not yet ack'ed and then (before acknowledging the message) disconnect + connect is called on an existing client instance, the Mosquitto broker logs are showing, that this not yet ack'ed message is published again by the broker to the client (as expected with d=1) - but the onMessage callback is never called again. And that, the callback is not called for this d=1 message, seems to be not correct.

The complete flow as an example:

  1. Client connects + subscribes
  2. A QoS1 message is sent: Mosquitto broker sends a PUBLISH to the client (with d=0)
  3. Client receives the message and the onMessage callback is called (you can see this, when "Received message" is logged), the PUBACK is not yet sent by the client
  4. client.disconnect + client.connectWith...
  5. Mosquitto broker sends a second PUBLISH to the client, now with d=1 (as expected according to the MQTT spec)
  6. The onMessage callback is not called anymore for the second PUBLISH in 5) but I would expect it to be called (7. Only when creating and connecting a completely new MQTT client instance, the callback is called again for the message in 5)

for a fault tolerance scenario, we want to ensure, that MQTT messages are re-delivered in case the application is temporarily not able to handle (store) a received message. For that we want to use manual acknowledgements and reconnect the MQTT client after a delay, so that messages are re-delivered until the consuming application can store the message (and will acknowledge the message then).

Am I correct that you want to disconnect - wait - reconnect when you receive too many messages? This is not a great solution in my opinion. Instead you can specify a lower receive maximum in the connect message. This limits the maximum amount of concurrent messages the broker is allowed to send to the client. Delaying the manual acknowledgements also delays sending the PUBACK message, so just delay the manual acknowledgement when "the application is temporarily not able to handle (store) a received message".

No, preventing to receive too many messages is not our intention. As you can see, the receive maximum is already set to "1" in the reproducer. So, this is not an issue. The intention was just to re-deliver a message later, in case the application could not store the message. In the meanwhile, we found another solution to handle this failure situation in a robust way. As you are proposing, the application does not reconnect the client anymore but only delays the PUBACK until it can handle (store) the message. So, from this point of view, we found a solution which solves our problem and that's fine for us.

Anyway, what I wanted to address with this issue - callback not called for redelivered messages after disconnect+connect - seems to be not a correct behaviour of the client.

SgtSilvio commented 4 years ago

Hi @LarsGielsok

Glad to hear that you could solve your issue.

Anyway, what I wanted to address with this issue - callback not called for redelivered messages after disconnect+connect - seems to be not a correct behaviour of the client.

This is not an issue because the client just deduplicated the same QoS 1 message. As long as the same client instance is used the application is still alive which means that the callback has already been called. But you are correct that the client can not be 100% sure that the resent QoS 1 message is the same message. With version 1.2.1 we improved the QoS handling to never deduplicate QoS 1 messages. Users should use QoS 2 if they really want deduplication.

larsgx commented 4 years ago

Hi @SgtSilvio, thanks for the update. This behaviour is a bit surprising to us...but good to know. Kind Regards Lars