eclipse / paho.mqtt.java

Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.
https://eclipse.org/paho
Other
2.12k stars 884 forks source link

Connect already in progress, #1035

Open marinesky opened 5 months ago

marinesky commented 5 months ago

Hi,we are using the Paho MQTT Java client analog pressure test, There is a certain probability when the client connects:Connect already in progress,

Here is My Code

public class Mqtt3Test {
    public static void main(String[] args)  {
        Integer begin =0;
        Integer end =10000;
        String defaultServer ="127.0.0.1:1883";
        try {
            CountDownLatch latch = new CountDownLatch(1);
            for (int i=begin;i<end;i++){

                Mqtt3 mqtt = new Mqtt3(defaultServer, i+"");

                mqtt.start();
                Thread.sleep(100);
            }

            latch.await();
        } catch (Exception e){
            System.out.println(" ChairTest error "+e.getMessage());
        }
    } }

`

public class Mqtt3 {

private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

public MqttClient client;
private MqttConnectOptions options;

private String serverURL;
private String id;

public Mqtt3(String serverURL, String id) {
    this.id=id;
    this.serverURL =serverURL;
    init();
}

private void init() {
    try {
        this.client = new MqttClient("tcp://" + serverURL, id, new MemoryPersistence());
        client.setTimeToWait(10*1000);
        this.options = new MqttConnectOptions();
        this.options.setCleanSession(true);
        this.options.setConnectionTimeout(10);
        this.options.setAutomaticReconnect(false);
        this.options.setKeepAliveInterval(20);
        this.client.setCallback((MqttCallback)new MqttCallbackExtended() {
            public void connectComplete(boolean reconnect, String serverURI) {

            }

            public void connectionLost(Throwable cause) {

            }

            public void messageArrived(String topic, MqttMessage message) throws Exception {

            }

            public void deliveryComplete(IMqttDeliveryToken token) {}
        });
    } catch (Exception e) {
        System.out.println("initException:" + e.getMessage());
    }
}

public void shutdown() {
    try {
        client.disconnectForcibly();
    } catch (Exception e) {
        System.out.println("clientId:"+id+",disconnectException:" + e.getMessage());
    }
    try {
        client.close(true);
    } catch (Exception e) {
        System.out.println("clientId:"+id+",closeException:" + e.getMessage());
    }

}

public void start() {
    try {
        startAutoConnectCheck();
        client.connect(options);

    } catch (Exception e) {
        System.out.println("clientId:"+id+",startException:" + e.getMessage());
    }
}

public boolean isShutDown() {
    if (!client.isConnected())
        return true;
    return false;
}

public void startAutoConnectCheck() {
    executorService.scheduleAtFixedRate(this::checkConnection, 60, 60, TimeUnit.SECONDS);
}

private void checkConnection() {
    if (isShutDown()){
        try {
            client.connect(options);
        } catch (Exception e) {
            System.out.println( "clientId:" + id  + " checkConnectException " + e.getMessage());
            shutdown();
            client=null;
            options=null;
            init();
        }
    }
}

In the code above, I use connection checking to ensure that each client connects successfully, at best.However, the following results are extremely rare, causing the connection to not be completely released

clientId:8000,startException:Timed out waiting for a response from the server clientId:8000 checkConnectException Connect already in progress clientId:8000 ,closeException:Connect already in progress

Mqtt Server always can recevie Client Ping,and tcpdump confirm conack also send

"MQTT Ping: 8000 " #24144 prio=5 os_prio=0 tid=0x00007f7e3057a800 nid=0xb3f9c in Object.wait() [0x00007f78f53de000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.util.TimerThread.mainLoop(Timer.java:552)

"MQTT Call: 8000 " #24140 prio=5 os_prio=0 tid=0x00007f7e2c58a800 nid=0xb3f98 in Object.wait() [0x00007f78f50db000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:198)

"MQTT Snd: 8000 " #24139 prio=5 os_prio=0 tid=0x00007f7e2c588800 nid=0xb3f97 in Object.wait() [0x00007f78f51dc000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at org.eclipse.paho.client.mqttv3.internal.ClientState.get(ClientState.java:836)

"MQTT Rec: 8000 " #24138 prio=5 os_prio=0 tid=0x00007f7e2c586800 nid=0xb3f96 runnable [0x00007f78f52dd000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.net.SocketInputStream.read(SocketInputStream.java:224) at java.io.DataInputStream.readByte(DataInputStream.java:265) at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:160) at java.lang.Thread.run(Thread.java:748)

I hope you can help me solve this problem,thank