eclipse-paho / paho.mqtt.java

Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.
https://eclipse.org/paho
Other
2.14k stars 886 forks source link

addTopic->subscribe - Adding a topic while the client is active always yields an exception #984

Open OpenSrcerer opened 1 year ago

OpenSrcerer commented 1 year ago

Please fill out the form below before submitting, thank you!


I am building a Spring application where I have to subscribe to topics during runtime. Using the Spring integration's Mqttv5PahoMessageDrivenChannelAdapter I am calling addTopics(), which subsequently ends up calling subscribe() on MqttAsyncClient. The problem arises when adding a subscription when the client is already active. Here is a trace of the chain of events:

Caused by: java.lang.IndexOutOfBoundsException: Index 0 out of bounds for length 0
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) ~[na:na]
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) ~[na:na]
    at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:266) ~[na:na]
    at java.base/java.util.Objects.checkIndex(Objects.java:359) ~[na:na]
    at java.base/java.util.ArrayList.get(ArrayList.java:427) ~[na:na]
    at org.eclipse.paho.mqttv5.client.MqttAsyncClient.subscribe(MqttAsyncClient.java:1276) ~[org.eclipse.paho.mqttv5.client-1.2.5.jar:na]
    at org.eclipse.paho.mqttv5.client.MqttAsyncClient.subscribe(MqttAsyncClient.java:1205) ~[org.eclipse.paho.mqttv5.client-1.2.5.jar:na]
    at org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter.addTopic(Mqttv5PahoMessageDrivenChannelAdapter.java:257) ~[spring-integration-mqtt-6.0.2.jar:6.0.2]
    at org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter.addTopics(AbstractMqttMessageDrivenChannelAdapter.java:301) ~[spring-integration-mqtt-6.0.2.jar:6.0.2]

It seems that the issue arises from the fact that subscribe is called with a freshly constructed MqttProperties -> MqttAsyncClient.java:1205. These properties are evaluated in MqttAsyncClient.java:1276:

        @Override
    public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext, MqttActionListener callback,
            IMqttMessageListener messageListener, MqttProperties subscriptionProperties) throws MqttException {

        int subId = subscriptionProperties.getSubscriptionIdentifiers().get(0); // <- Issue is here, properties are new so the ArrayList is not populated

This obviously throws an exception and prevents the client from adding any topics at runtime.