eclipse / paho.mqtt.java

Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.
https://eclipse.org/paho
Other
2.08k stars 879 forks source link

Connect waitForCompletition hangs when broker disconnects quickly after accepting #1013

Open fnordian opened 10 months ago

fnordian commented 10 months ago

There is a race-condition that lets waitForCompletion hang forever if the broker closes a connection quickly after accepting it.

The code below can reproduce it, but since it's a race-condition it does not occur reliably. The bug is demonstrated when the test hangs.

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

@Slf4j
public class PahoReconnectTest {

    private ServerSocket serverSocket = new ServerSocket(0);
    private int port = serverSocket.getLocalPort();

    @Getter
    @Setter
    private volatile long disconnectInterval = 0;

    public PahoReconnectTest() throws IOException {
        new Thread(() -> {
            try {
                dummyMqttBroker();
            } catch (IOException | InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();

    }
    private void dummyMqttBroker() throws IOException, InterruptedException {

        port = serverSocket.getLocalPort();

        disconnectInterval = 1;

        while (!Thread.currentThread().isInterrupted()) {
            log.info("accepting");
            Socket clientSocket = serverSocket.accept();

            new Thread(() -> {

                try {
                    log.info("accepted new client");
                    clientSocket.getInputStream().read(new byte[22], 0, 22);
                    PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
                    out.write(new char[]{0x20, 0x02, 0x00, 0x00});
                    out.flush();
                    if (disconnectInterval > 20) {
                        Thread.sleep(disconnectInterval);
                    }
                    disconnectInterval *= 2;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    try {
                        clientSocket.close();
                    } catch (IOException e) {
                        log.error("unable to close client socket", e);
                    }
                }
            }).start();
        }
    }

    @Test
    public void testPahoReconnect() throws MqttException, InterruptedException {
        String uri = "tcp://127.0.0.1:" + port;
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setMaxReconnectDelay(10);

        for (int i = 0; i < 30; i++) {
            try {
                log.info("creating new client");

                IMqttAsyncClient client = new MqttAsyncClient(uri, "test", null);
                client.connect(mqttConnectOptions).waitForCompletion();

                Thread.sleep(30);
                client.close();
            } catch (MqttException mqttException) {

            }

        }
    }
}