eclipse / paho.mqtt.java

Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.
https://eclipse.org/paho
Other
2.08k stars 879 forks source link

Reconnecting not working for MqttAsyncClient w/ file persistence, offline buffering, and QOS 2 publishes #1025

Open benWSw opened 7 months ago

benWSw commented 7 months ago

My use case requires a publishing MQTT client with file based persistence and offline buffering. The client publishes QOS 2 messages to a broker on the network. When the connection is disrupted (broker goes down, network disconnect, etc.), the client reconnects, attempts to publish a messages, and then disconnects again. This behavior cycles, with none of the messages in the buffer getting cleared. Below is a client setup that was able to replicate the behavior:

package mqtt_test;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;

public class Main {
    public static void main(String[] args) {
        String broker = "tcp://10.102.11.26:1883";
        String clientId = UUID.randomUUID().toString();
        try {
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setAutomaticReconnect(true);
            mqttConnectOptions.setMaxInflight(500);
            mqttConnectOptions.setCleanSession(false);
            mqttConnectOptions.setConnectionTimeout(5);

            DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
            disconnectedBufferOptions.setBufferEnabled(true);
            disconnectedBufferOptions.setBufferSize(5000);
            disconnectedBufferOptions.setPersistBuffer(true);
            disconnectedBufferOptions.setDeleteOldestMessages(true);

            MqttDefaultFilePersistence persistence = new MqttDefaultFilePersistence("C:\\Users\\user\\IdeaProjects\\mqtt_test\\src");

            MqttAsyncClient client = new MqttAsyncClient(broker, clientId, persistence);

            client.setBufferOpts(disconnectedBufferOptions);
            client.connect(mqttConnectOptions);
            for(int i = 0; i < 10000; i++) {
                TimeUnit.MILLISECONDS.sleep(100);
                byte[] payload = "test".getBytes("UTF-16");
                client.publish("topic1", payload, 2, false);
            }
        }
        catch(Exception e) {
            System.out.println(e.toString());
        }
    }

}

The following error is printed to the console: SEVERE: null: Error occurred attempting to publish buffered message due to disconnect. Exception: 32,102:Client is currently disconnecting. Dec 08, 2023 2:38:42 PM org.eclipse.paho.client.mqttv3.internal.DisconnectedMessageBuffer run SEVERE: null: Error occurred attempting to publish buffered message due to disconnect. Exception: 32,104:Client is not connected. Dec 08, 2023 2:38:43 PM org.eclipse.paho.client.mqttv3.internal.DisconnectedMessageBuffer run

On the broker side, I am using the Aedes broker for Node-RED v0.11.0. From the broker I get the following error: "Error: no such packet"