hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
860 stars 159 forks source link

Callback is not triggered after async client re-connection. #499

Closed frankyfish closed 1 year ago

frankyfish commented 3 years ago

Expected behavior

After performing re-connect and re-subscription to MQTT broker callback is fired each time a new message arrives to the topic the client is subscribed to.

Actual behavior

Whenever connection to MQTT broker is lost and then re-stablished, subscription to the same topic is made (using same client instance), callback is not triggered until asyncClient.publishes() is executed once again for it.

To Reproduce

Steps

  1. Create async client and connect to the broker
  2. Register global consumer for MqttGlobalPublishFilter.SUBSCRIBED
  3. Subscribe
  4. Disconnect the client from step 1 (either via broker tools or connection interruption)
  5. Perform reconnection with the client from step 1
  6. Subscribe to same topic as in step 3
  7. Publish something on the topic to which the client is subscribed to.

Reproducer code

Consider that an MQTT Broker is running on the same machine, locally. For reproducing the problem I've used HiveMQ3 running in Docker. Whenever the code below had been connected and subscribed to the broker I then disconnected it via HiveMQ's web ui to trigger re-connection & re-subscription logic of the test.

    @Test
    public void reconnectSameCallback() throws ExecutionException, InterruptedException {
        final @NotNull Mqtt3BlockingClient blockingClient
            = MqttClient.builder()
                        .useMqttVersion3()
                        .identifier("test-clientId")
                        .addDisconnectedListener((MqttClientDisconnectedContext context) -> {
                            log.warn("Disconnected! ", context.getCause());
                        })
                        .serverHost("127.0.0.1")
                        .serverPort(1883)
                        .buildBlocking();
        Mqtt3AsyncClient asyncClient = blockingClient.toAsync();

        final Mqtt3Subscribe subscription =
            Mqtt3Subscribe.builder()
                          .addSubscription()
                          .topicFilter("test/1")
                          .qos(MqttQos.AT_LEAST_ONCE)
                          .applySubscription()
                          .build();
        Consumer<Mqtt3Publish> subCallback = (Mqtt3Publish publish) -> {
            System.out.println("Sub COMMON callback: " + publish.getTopic() + " " + publish.getPayload());
        };
        asyncClient.connect().get(); // waiting

        // Single callback registration as suggested here:
        //  https://github.com/hivemq/hivemq-mqtt-client/issues/454
        asyncClient.publishes(MqttGlobalPublishFilter.SUBSCRIBED, subCallback);
        asyncClient.subscribe(subscription);

        //simulating running app with re-connection executed by some custom code
        while (true) {
            boolean connected = asyncClient.getConfig().getConnectionConfig().isPresent();
            if (!connected) {
                log.warn("Reconnecting...");
                try {
                    asyncClient.connect().get();// waiting
                    log.info("Re-Connected!");
                    asyncClient.subscribe(subscription);
                    log.info("Re-Sub!");
                    //asyncClient.publishes(MqttGlobalPublishFilter.SUBSCRIBED, subCallback); // this will fix the problem
                } catch (ExecutionException e) {
                    log.warn(e.getMessage());
                }
            }
        }
    }

Details

I am a bit confused with the behaviour of publishes(), I don't expect that I have to execute it each time i establish a connection, in my understanding whenever I register a consumer for mqtt client instance it should be working unless I destroy the instance of the client.

fblampe commented 2 years ago

@frankyfish Did you ever find a solution to this problem? I'm seeing something similar in an app I'm developing.

pglombardo commented 1 year ago

Hi @fblampe & @frankyfish ,

When the client reconnects, the broker will send down pending messages immediately and this may happen between the call to connect and publishes.

Here is an example I have with QoS 2 messages:

client-preconfiguration

To avoid this, move the publishes call above the connect call:

asyncClient.publishes(MqttGlobalPublishFilter.SUBSCRIBED, subCallback);
asyncClient.connect().get();
asyncClient.subscribe(subscription);

I believe this should resolve the issue. Please let me know if this helps!

pglombardo commented 1 year ago

Another solution is to call subscribe before connect as described here.

pglombardo commented 1 year ago

Considering the age of this issue and two solutions are now provided, I'll close out this issue. If any issues remain, feel free to open another issue anytime or contact us in our community forum.

Thanks for pointing this out!