eclipse / paho.mqtt.java

Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.
https://eclipse.org/paho
Other
2.1k stars 880 forks source link

Deadlock when subscribing to many topics #985

Open watta90 opened 1 year ago

watta90 commented 1 year ago

If we call subscribe in MqttCallback.connectComplete(boolean arg0, String arg1) we get a deadlock when we try to subscribe to more than 5 topics. I don't know if this has to do with my usecase only.

Example to reproduce error

public static void main(String[] args) {

    String broker = "tcp://localhost:1883";
    String clientId = "deadlock";

    System.out.println("Starting MQTT Client " + clientId);

    try {
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setAutomaticReconnect(true);
        connOpts.setCleanSession(true);
        MqttClient client = new MqttClient(broker, clientId);
        client.setCallback(new MqttCallbackExtended() {

            @Override
            public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
                System.out.println("new msg: " + arg0 + " " + arg1);
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken arg0) {

            }

            @Override
            public void connectionLost(Throwable arg0) {
                System.err.println("Lost connection");
            }

            @Override
            public void connectComplete(boolean arg0, String arg1) {
                try {
                    System.out.println("Subscribing to topics");

                    client.subscribe("mysys/+/topic0/#", 0);
                    client.subscribe("mysys/+/topic1", 0);
                    client.subscribe("mysys/+/topic2/+");
                    client.subscribe("mysys/+/topic3/+", 0);
                    client.subscribe("mysys/+/topic4/#", 0);
//                  client.subscribe("mysys/+/topic5/+", 0); //if we add this subscribe everything locks. None of the subscriptions above will work, no messages arrive.
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        });
        client.connect(connOpts);
    } catch (MqttException e) {
        e.printStackTrace();
    }
}

Is this the intended behaviour or is it a bug?

To get around this problem you can place all the subscribe actions on a separate thread in your application. Like this.

public static void main(String[] args) {

    String broker = "tcp://localhost:1883";
    String clientId = "deadlock";

    System.out.println("Starting MQTT Client " + clientId);

    try {
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setAutomaticReconnect(true);
        connOpts.setCleanSession(true);
        final MqttClient client = new MqttClient(broker, clientId);
        client.setCallback(new MqttCallbackExtended() {

            @Override
            public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
                System.out.println("new msg: " + arg0 + " " + arg1);
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken arg0) {

            }

            @Override
            public void connectionLost(Throwable arg0) {
                System.err.println("Lost connection");
            }

            @Override
            public void connectComplete(boolean arg0, String arg1) {
                try {
                    System.out.println("Subscribing to topics");
                    new Thread(new Runnable() {

                        @Override
                        public void run() {
                            try {
                                client.subscribe("mysys/+/topic0/#", 0);
                                client.subscribe("mysys/+/topic1", 0);
                                client.subscribe("mysys/+/topic2/+");
                                client.subscribe("mysys/+/topic3/+", 0);
                                client.subscribe("mysys/+/topic4/#", 0);
                                client.subscribe("mysys/+/topic5/+", 0); // this works now as well and everything subscribes.
                            } catch (MqttException e) {
                                e.printStackTrace();
                            }

                        }
                    }).start();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        client.connect(connOpts);
    } catch (MqttException e) {
        e.printStackTrace();
    }
}
zlsq commented 8 months ago

Hello, I have also encountered a situation where I cannot receive messages, but I cannot determine if it is caused by a deadlock.

zlsq commented 8 months ago

I reproduced the issue using MQTT v5 stable release1.2.5, and resolved it by adding a thread. Additionally, changing from MqttClient to MqttAsyncClient also resolves the issue.