hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
860 stars 159 forks source link

MQTT 5 server without client takeover support and reconnect #497

Closed missourian55 closed 1 year ago

missourian55 commented 3 years ago

I am trying to simulate un-graceful disconnect and client takeover (in this case, the broker doesn't support it). Like to know how to re-establish the connection? I am not sure if this is a bug with HiveMQ client, but would like to disclose what I am testing and appreciate any recommendations.

To Reproduce

Steps

  1. Start the application
  2. Run it for few minutes. Publish a few messages to MQTT 5 broker
  3. Close the laptop for few minutes and let it go to sleep
  4. Open the laptop; application resumes and trying to post a new message to MQTT 5 broker.
  5. Getting below error
    com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected.

Reproducer code


@ApplicationScoped
@JBossLog
public class PriceGenerator {

    private final Random random = new Random();

    @Inject
    Mqtt5BlockingClient mqtt5BlockingClient;

    @ConfigProperty(name = "mqtt.topic")
    String topic;

    @Scheduled(every = "45s")
    public void generateMessage() {

        // troubleshooting the problem
        if(!mqtt5BlockingClient.getState().isConnected()) {
            log.info("Mqtt in disconnect state. Re-establishing connection... ");
            try {
                destroy();
            } catch (Exception e) {
                e.printStackTrace(); //MqttClientStateException: MQTT client is not connected.
                log.error("disconnect failure");
            }
            try {
                mqtt5BlockingClient.connect();
            } catch (Exception e) {
                e.printStackTrace(); // MqttClientStateException: MQTT client is already connected or connecting.
                log.error("connect failure");
            }
        }

        mqtt5BlockingClient
            .publishWith()
            .topic(topic)
            .payload(String.valueOf(random.nextInt(100)).getBytes(UTF_8))
            .qos(AT_LEAST_ONCE)
            .messageExpiryInterval(120)
            .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
            .send();
    }

    @PostConstruct
    public void init() {
        log.info("Going to establish a subscription with a Mqtt broker...");
        mqtt5BlockingClient.toAsync()
            .subscribeWith()
            .topicFilter(topic)
            .qos(AT_LEAST_ONCE)
            .callback(publish -> {
                final Optional<ByteBuffer> correlationData = publish.getCorrelationData();
                log.infov("Received message: {0}  ->  {1}", publish.getTopic(), UTF_8.decode(publish.getPayload().get()));
            })
            .send();

    }

    @PreDestroy
    public void destroy() {
        mqtt5BlockingClient.disconnect();
        log.info("Disconnected the Mqtt client...");
    }

}

@Singleton
class AppConfig {

    @ConfigProperty(name = "mqtt.host")
    String host;

    @ConfigProperty(name = "mqtt.port")
    int port;

    @Singleton
    @Produces
    public Mqtt5BlockingClient mqtt5BlockingClient() {

        final Mqtt5BlockingClient client = MqttClient.builder()
            .useMqttVersion5()
            .serverHost(host)
            .serverPort(port)
            .identifier(UUID.randomUUID())
            .automaticReconnectWithDefaultConfig()
            .buildBlocking();

        client.connectWith()
            .simpleAuth()
            .username("test")
            .password(UTF_8.encode("test"))
            .applySimpleAuth()
            .sessionExpiryInterval(3600 * 24)
            .cleanStart(true) 
            .keepAlive(30)
            .send();

        return client;

    }

}

Details

SgtSilvio commented 3 years ago

Hi @missourian55 If you use automaticReconnect, then there is no reason to call if (!isConnected) { disconnect; connect; } yourself. Currently, the client does not support disconnecting and reconnecting yourself when it will automatically reconnect (https://github.com/hivemq/hivemq-mqtt-client/issues/302). So, everything should work and if you remove the if (!isConnected) { disconnect; connect; } code, then you should not get any exceptions logged.

missourian55 commented 3 years ago

@SgtSilvio Thank you! I added this if (!isConnected) { disconnect; connect; } after noticing that client is not automatically reconnecting. Does MQTT server have any special support for client automaticReconnect? In my case MQTT server is Solace

SgtSilvio commented 3 years ago

@missourian55 sorry for the delayed response. I don't know how Solace handles reconnects/client takeovers and so on. To see if the server rejects reconnect attempts, please add a ConnectedListener and DisconnectedListener which log all connect attempts and failures:

MqttClient.builder()
            ...
            .automaticReconnectWithDefaultConfig()
            .addConnectedListener(context -> log.info("connected"))
            .addDisconnectedListener(context -> log.info("disconnected by {}, cause {}", context.getSource(), context.getCause()))
            ...
pglombardo commented 1 year ago

Hi @missourian55 - it's been while since this issue has been updated. I'm going to close it for now but if anything remains, feel free to re-open it again or file another issue. Thanks for using the HiveMQ client!