hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
855 stars 158 forks source link

[Persistent Sessions] Broker sends message before re-subscribing and client acknowledges those messages #539

Closed Blafasel3 closed 1 year ago

Blafasel3 commented 2 years ago

Expected behavior

Subscribing should be possible before connecting the client - as described in https://github.com/hivemq/hivemq-mqtt-client/issues/521.

Actual behavior

Subscribing before connecting blocks forever.

To Reproduce

Steps

  1. We have a lot of QOS 2 message on different topics
  2. The client reconnects with cleanSession = false because we are not allowed to loose those messages
  3. Accordingly, sessionExpiryInterval is set to 6000
  4. So the already existing subscriptions are completely valid right after the reconnect from the broker side
  5. Connecting before subscribing leads to the error message "No publish flow registered..." (MqttIncomingPublishService) for all messages send by the broker before subscribes are processed from the client side
  6. We loose the messages, because the following code snippet in MqttIncomingPublishService acknowledges the message even if there is no publish flow registered:
    @CallByThread("Netty EventLoop")
    void drain() {
    runIndex++;
    blockingFlowCount = 0;
    qos1Or2It.reset();
    while (qos1Or2It.hasNext()) {
        final MqttStatefulPublishWithFlows publishWithFlows = qos1Or2It.next();
        emit(publishWithFlows);
        if ((qos1Or2It.getIterated() == 1) && publishWithFlows.isEmpty() && publishWithFlows.areAcknowledged()) {
            qos1Or2It.remove();
            **incomingQosHandler.ack(publishWithFlows);**
        } else if (blockingFlowCount == referencedFlowCount) {
            return;
        }
    }
    ....
    }
  7. Thats why we tried to ensure via the following Code that all subscriptions actually happen before the connect.

Reproducer code

Subscribing to messages before the connect does not work . The Single.timer is there to enforce the race condition we observed.

var connectBuilder = mqttClient.connectWith()
                         .sessionExpiryInterval(6000)
                         .cleanStart(false)
                         .keepAlive(3600);
Single.timer(20, TimeUnit.SECONDS)
    .flatMap(it -> {
        var subscribeMessage = Mqtt5Subscribe.builder()
                                   .topicFilter("topic")
                                   .qos(MqttQos.EXACTLY_ONCE)
                                   .build();
        return mqttClient
                   .subscribe(subscribeMessage)
                   .doOnSuccess(subscribeAck -> LOGGER.info("MQTT-Subscription ACK {}", subscribeAck));
    })
    .ignoreElement()
    .subscribe();
connectBuilder.applyConnect();

Same behavior using subscribeSingleFuture (subscriptions is just a config POJO we use to configure the topics.):

var connectBuilder = builder.cleanStart(false);     
var completableFutures = subscriptions.stream()
                                     .map(subscription -> {
                                         MqttQos hivemqQos = ITCS_QOS_TO_HIVE_MQ_QOS.get(subscription.getQos());
                                         return mqttClient.subscribePublishesWith()
                                                    .topicFilter(subscription.getTopic().getTopicName())
                                                    .qos(hivemqQos)
                                                    .applySubscribe()
                                                    .subscribeSingleFuture(message -> handleMessage(subscription, message))
                                                    .exceptionally(throwable -> {
                                                        LOGGER.error(
                                                            "MQTT-Subscription nicht erfolgreich für Topic {}",
                                                            subscription.getTopic().getTopicName(),
                                                            throwable
                                                        );
                                                        throw new CompletionException(throwable);
                                                    })
                                                    .thenApply(mqtt5SubAck -> handleMqtt5SubAck(subscription, mqtt5SubAck));
                                     })
                                     .toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(completableFutures).join();
        var connected = mqttClient.connectWith().applyConnect()
                            .subscribe(
                                success -> LOGGER.info(
                                    "Connect Scenario Ergebnis: clientId = {}; reason = {}",
                                    getClientId(),
                                    success.getReasonString()
                                ),
                                ex -> LOGGER.warn("Connect Scenario Error", ex)
                            );

Details

pglombardo commented 1 year ago

Hi @Blafasel3,

I've also seen this hang prior to the connect call a few times and I'll look to investigate this soon.

In the meantime, there is another code pattern you can use to work around this issue. It involves setting the message handler prior to the connect call as follows:

client.toAsync().publishes(MqttGlobalPublishFilter.SUBSCRIBED, mqtt5Publish -> {
    System.out.println("received message: " + mqtt5Publish);
});

So the full code flow would be:

Mqtt5AsyncClient client = Mqtt5Client.builder().identifier("mqttConsumer")
         .serverHost(host)
         .serverPort(port)
         // <snip>
         .buildAsync();

client.toAsync().publishes(MqttGlobalPublishFilter.SUBSCRIBED, mqtt5Publish -> {
    System.out.println("received message: " + mqtt5Publish);
});

final Mqtt5ConnAck connAck = client.toBlocking().connectWith()
         .cleanStart(false)          // resume a previous session
         .sessionExpiryInterval(300)  // keep session state for 300s
         .send();

I hope this helps - please let me know latest status.

pglombardo commented 1 year ago

I believe my previous post should resolve the subscribe issue so I'm going to close out this issue. If anything remains, please feel free to re-open or file another issue. We'd be very happy to help out.