hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
855 stars 158 forks source link

Message arrival callback to broker #560

Closed marcelofilho closed 1 year ago

marcelofilho commented 1 year ago

Checklist

❓ Question

How can I know if a published message actually arrived at the broker?

Is it possible to have a callback using publishWith or something like that

πŸ“Ž Additional context

I'm having trouble publishing messages to the broker. I use Mqtt3AsyncClient configuring port and url and to publish the message I use publishWith in this way

if (client.state.isConnectedOrReconnect){
                client.publishWith()
                    .topic("${Server.TOPIC_HISTORICAL}${data.serialNumber}")
                    .payload(data.data)
                    .qos(MqttQos.AT_LEAST_ONCE)
                    .retain(false)
                    .send()

The problem is that my internet fluctuates a lot, and it is very slow. The client does not disconnect and continues to send data. With that I lose a lot of data during my session. Would there be any way for me to know if the package actually arrived at the broker? So it would only send the next message if the current message arrived successfully. I looked in the documentation and found nothing.

note: qos AT_LEAST_ONCE for some reason does not guarantee the arrival of the packet, I think it is because there is not a strong enough connection and the application does not stop sending packets

pglombardo commented 1 year ago

Hi @marcelofilho,

QoS AT_LEAST_ONCE actually does guarantee the arrival of the package to the broker. That level requires that the MQTT broker respond with a PubAck message. Only then is the operation complete.

For reference/comparison, there is a good explanation of all the Quality of Service ins and outs here.

QoS level 1 guarantees that a message is delivered at least one time to the receiver. The sender stores the message until it gets a PUBACK packet from the receiver that acknowledges receipt of the message. It is possible for a message to be sent or delivered multiple times.

publishWith returns a CompletableFuture. You need to wait on that future to complete to know that the publish got acknowledged by the broker. e.g. When that future completes, it means that the MQTT broker responded with the PubAck acknowledgement.

Here's an example that does nothing.

client.connect()
        .thenCompose(connAck -> client.publishWith()
                    .topic("${Server.TOPIC_HISTORICAL}${data.serialNumber}")
                    .payload(data.data)
                    .qos(MqttQos.AT_LEAST_ONCE)
                    .retain(false)
                    .send())
        .thenCompose(publishResult -> client.disconnect());

Let me know if this helps.

marcelofilho commented 1 year ago

Good afternoon, thank you very much for your reply. I implemented something with your tips and continued without success. Am I doing something wrong?

 try {
            if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
                client.connect()
                    .thenCompose<Mqtt3Publish> { connAck: Mqtt3ConnAck? ->
                        client.publishWith()
                            .topic("\${Server.TOPIC_HISTORICAL}\${data.serialNumber}")
                            .payload(data.data)
                            .qos(MqttQos.AT_LEAST_ONCE)
                            .retain(false)
                            .send().whenComplete { mqtt3Publish, throwable ->
                                if (throwable != null)
                                    Logs().setLog("Not received")
                            }
                    }.thenCompose { publishResult: Mqtt3Publish? ->
                        publishResult
                        Logs().setLog("Success")
                        client.disconnect()
                    }.apply {
                        Logs().setLog("Success apply")
                        emit(true)
                    }
            }
        } catch (e: Exception) {
            Logs().setLog("Error publishMessage {${e.message}} ")
            if (BuildConfig.DEBUG)
                Log.e("MQTT", e.message, e)
            emit(false)
        }

I tried in every way to get some feedback that the data was received by the broker, and I can't do that. The internet here is really bad and clearly the data was not received but there are no exceptions, this is very strange.

I think I'm not understanding how the CompleteFuture works. thank you very much if you can help me. thanks again for your answer

pglombardo commented 1 year ago

Apologies... I didn't post the best example. Take a look at the example here.

client.connect()
        .thenAccept(connAck -> System.out.println("connected " + connAck))
        .thenCompose(v -> client.publishWith().topic("demo/topic/b").qos(MqttQos.AT_LEAST_ONCE).send())
        .thenAccept(publishResult -> System.out.println("published " + publishResult))
        .thenCompose(v -> client.disconnect())
        .thenAccept(v -> System.out.println("disconnected"));

Try pasting this block into your code with modifications. It should do the job.

or alternatively, to debug you could flip to the blocking client:

MqttPublishResult publishResult = (MqttPublishResult) client.toBlocking().publishWith()
                .topic("demo/topic/a")
                .qos(MqttQos.AT_LEAST_ONCE)
                .payload("payload".getBytes())
                .retain(true)
                .contentType("text/plain")  // our payload is text
                .messageExpiryInterval(120) // not so important, expire message after 2min if can not be delivered
                .send();

System.out.println("published " + publishResult);

This blocking client example can be found here.

I tried in every way to get some feedback that the data was received by the broker, and I can't do that. The internet here is really bad and clearly the data was not received but there are no exceptions, this is very strange.

Are your consumers receiving the data?

I think I'm not understanding how the CompleteFuture works. thank you very much if you can help me. thanks again for your answer

Async can be a bit complex. Futures are scheduled to run on an event loop - the result is only available after execution is complete.

Hopefully this gets you on the right path. Let me know.

pglombardo commented 1 year ago

Hi @marcelofilho - have you had any updates on this issue?

marcelofilho commented 1 year ago

hey @pglombardo apologies for the delay. I don't even know how to thank you, but it worked. Apparently toBlocking() blocks the next message until the broker confirms receipt of the previous message. Really thank you.