eclipse / paho.mqtt.java

Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.
https://eclipse.org/paho
Other
2.12k stars 884 forks source link

MqttClient doesn't receive leftover messages after being offline #1005

Closed f0lg0 closed 1 year ago

f0lg0 commented 1 year ago

Please fill out the form below before submitting, thank you!

If this is a bug regarding the Android Service, please raise the bug here instead: https://github.com/eclipse/paho.mqtt.android/issues/new

I have the following code for a simple subscriber v5 client which connects to the HiveMq Public Broker:

import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MqttDefaultFilePersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

public class Subscriber {
  private final String topic        = "test";
  private final int qos             = 2;
  private final String broker       = "tcp://broker.hivemq.com";
  private final String clientId     = "myuniqueclientid";
  private final MqttDefaultFilePersistence storage = new MqttDefaultFilePersistence("storage");

  public void run() {
    try {
      final MqttConnectionOptions options = new MqttConnectionOptions();
      options.setAutomaticReconnect(true);
      options.setCleanStart(false);
      options.setSessionExpiryInterval(null);
      options.setKeepAliveInterval(60);

      final MqttClient client = new MqttClient(this.broker, this.clientId, this.storage, null);

      client.setCallback(new MqttCallback() {
        @Override
        public void disconnected(final MqttDisconnectResponse disconnectResponse) {
          System.out.println(disconnectResponse.toString());
        }

        @Override
        public void mqttErrorOccurred(final MqttException exception) {
          System.out.println(exception.toString());
        }

        @Override
        public void messageArrived(final String topic, final MqttMessage message) throws Exception {
          System.out.println(topic + ": " + new String(message.getPayload()));
        }

        @Override
        public void deliveryComplete(final IMqttToken token) {
          System.out.println("delivery complete");
        }

        @Override
        public void connectComplete(final boolean reconnect, final String serverURI) {
          System.out.println("connect complete");
        }

        @Override
        public void authPacketArrived(final int reasonCode, final MqttProperties properties) {
          System.out.println("auth packet arrived");
        }
      });

      client.connect(options);
      client.subscribe(this.topic, this.qos);
    } catch (final Exception e) {
      e.printStackTrace();
    }
  }
}

Since I want to get a persistent session I've set the cleanStart flag to false and I've given the client a static id, while also using QoS=2 messages. When I run the client with the following main:


public class Main {
  public static void main(final String[] args) {
    final Subscriber sub = new Subscriber();
    sub.run();
  }
}

I am able to receive messages (sent using this) while being connected (as expected):

connect complete
test: testing
Jun 13, 2023 3:48:55 PM org.eclipse.paho.mqttv5.client.internal.ClientState handleInboundPubRel
INFO: myuniqueclientid: Creating MqttPubComp: MqttPubComp [returnCode=0, properties=MqttProperties [validProperties=[31, 38]]]

image

But If I ctrl-c from the client and send other messages using the same tool, when I reconnect I don't see the messages that had been sent while the client was offline, while I should be able to given the persistent session.

I've also tried with a Python client using the same library (paho mqtt python) and I'm able to stream messages that had been sent while a given client was offline, using cleanSession=False and a static id, alongside QoS=2 messages.

In the example above I've also configured a DefaultFilePersistentStorage that does its job when the messageArrived callback crashes (I'm able to retrieve messages that were being processed before a crash), but it doesn't interfere with the bug stated above.

Am I missing a flag? Hope to hear some feedback. Thanks

UPDATE: Check my comment below, the bug doesn't persist if a value is passed to the setSessionExpiryInterval() method

f0lg0 commented 1 year ago

UPDATE The bug doesn't persist by passing a value to the options.setSessionExpiryInterval(null); call, even though null should get us a session without an expiry date.

f0lg0 commented 1 year ago

Duplicate of 909 and 873. Docs need to be updated to reflect correct behavior. image

ref