hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
872 stars 161 forks source link

Manual Acknowledgement doesn't work #627

Open Wenn-x opened 7 months ago

Wenn-x commented 7 months ago

I expected to receive repeated messages when publish.acknowledge() is not invoked, but only one was actually received, and I wonder why it didn't work.

var receiver = Mqtt5Client.builder()
        .identifier("receiver")
        .serverHost("localhost")
        .serverPort(1883)
        .buildBlocking();
receiver.connect();
receiver.toAsync().subscribeWith()
        .topicFilter("test")
        .callback(publish -> {
            log.info("Receive message: " + new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8));
            boolean success = false;

            // Some logic & conditions

            if (success) {
                publish.acknowledge();  // Conditionally acknowledge the message
            }
        })
        .manualAcknowledgement(true)
        .send();

var sender = Mqtt5Client.builder()
        .identifier("sender")
        .serverHost("localhost")
        .serverPort(1883)
        .buildBlocking();
sender.connect();
sender.publishWith().topic("test")
        .payload("this is a test message".getBytes())
        .send();

Thread.sleep(180 * 1000);

I test with:

pglombardo commented 7 months ago

Hi @Wenn-x,

Message acknowledgements are only valid for QoS 1 + 2 messages. From the code you posted, you are only handling QoS 0 messages. Could this be the issue?

The QoS docs if you need them: https://hivemq.github.io/hivemq-mqtt-client/docs/mqtt-operations/publish/#quality-of-service-qos

Wenn-x commented 7 months ago

Hi @pglombardo, thanks for your reply. Actually, I tried Qos 1 or 2 once, but got an error in the log which is considered maybe a bug of emqx in https://github.com/hivemq/hivemq-mqtt-client/issues/491. So I ignored it. How can I solve this?

QoS 2 PUBLISH (MqttStatefulPublish{stateless=MqttPublish{topic=test, payload=22byte, qos=EXACTLY_ONCE, retain=false}, packetIdentifier=1, dup=false, topicAlias=0, subscriptionIdentifiers=[1]}) must not be resent (MqttStatefulPublish{stateless=MqttPublish{topic=test, payload=22byte, qos=EXACTLY_ONCE, retain=false}, packetIdentifier=1, dup=true, topicAlias=0, subscriptionIdentifiers=[1]}) during the same connection
pglombardo commented 7 months ago

Ugh I see - I wasn't aware of that issue in EMQX. They are referring to the first paragraph in this section of the specification which makes sense.

Publishes are not allowed to be resent on the same connection until reconnect with cleanStart = false.

I think the only way to resolve this is to contact EMQX or switch to another broker. Beyond this specific instance, being non-compliant with the protocol spec risks many secondary and longer term issues for users.

If I can help with anything else, please let me know.

Wenn-x commented 7 months ago

Hi @pglombardo, maybe I didn't express it clearly. The error message was from the source code: com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingQosHandler#readPublishQos2. I think this has nothing to do with emqx. What I don't understand is whether this constraint conflicts with receiving duplicate messages without manual confirmation. Is there anything else I need to do?

image

pglombardo commented 7 months ago

Hi @Wenn-x - that's correct. This Java client correctly throws that error when the protocol is violated by the EMQ broker.

I'll try to explain more thoroughly:

Messages that are not acknowledged, are not allowed to be resent on the same connection. Those unacknowledged messages can only be sent on reconnect. When the broker you are using resends the messages on the same connection, this client correctly throws this error because it is a protocol violation.

It seems like a small distinction but these are the rules of the MQTT protocol. Strict adherence to the protocol specification guarantees predictable behavior & reliability when deploying to millions of devices and processing millions of messages per day (or more).

A couple notes:

  1. Mosquitto used to incorrectly resend unacknowledged publishes as well but this was fixed in 2018.
  2. The HiveMQ broker also follows the protocol spec and does not resend unacknowledged messages until reconnect.

My advice would be to try another broker and see the difference in behavior. Then with that knowledge, you can make the right decision on which path to pursue.

I hope I explained this well enough. Please let me know if not.

Wenn-x commented 7 months ago

Hi @pglombardo, thanks a lot for your explanation. My requirement scenario is that when some messages cannot be processed correctly in program, I hope to receive them again. I tried disconnect and reconnect the MQTT client with cleanStart = false, but still got nothing. Is the correct way to use manual acknowledgement or I should find other ways?

pglombardo commented 1 month ago

HI @Wenn-x - did you ever make any progress on this issue? If the messages aren't being resent with a reconnect/cleanStart = false - then that is the likely fault of the EMQ broker. Did you try the behaviour with another broker?