eclipse / paho.mqtt.java

Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.
https://eclipse.org/paho
Other
2.12k stars 884 forks source link

Auto assigned subscription id isn't sent to broker #1019

Open TDYJeffreyDevloo opened 1 year ago

TDYJeffreyDevloo commented 1 year ago

Auto assigned subscription identifier isn't sent to the broker

The existence of the subscription identifier when subscribing using the MQTT5 client is

A simple demo that demonstrates the issue:

import com.hivemq.client.mqtt.datatypes.MqttQos;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

public class TestPaho
{

  public static void main(String[] args) throws InterruptedException, MqttException
  {
    MqttAsyncClient serverClient = new MqttAsyncClient("tcp://localhost:1883", "myServer");
    MqttAsyncClient deviceClient = new MqttAsyncClient("tcp://localhost:1883", "myDevice");

    serverClient.connect().waitForCompletion();
    deviceClient.connect().waitForCompletion();

    System.out.println("Connected");

    CountDownLatch receivedLatch = new CountDownLatch(4);

    MqttProperties subProperties1 = new MqttProperties();
    subProperties1.setSubscriptionIdentifiers(List.of(0)); // Bug in paho? This is for publish but subscribe requires it
    # subProperties1.setSubscriptionIdentifier(1);
    serverClient.subscribe(new MqttSubscription("#", MqttQos.AT_LEAST_ONCE.getCode()), null, null, (topic, message) ->
    {
      System.out.println("# - Received message");
      receivedLatch.countDown();
    }, subProperties1);

    MqttProperties subProperties2 = new MqttProperties();
    subProperties2.setSubscriptionIdentifiers(List.of(0));
    # subProperties2.setSubscriptionIdentifier(2);
    serverClient.subscribe(new MqttSubscription("A", MqttQos.AT_LEAST_ONCE.getCode()), null, null, (topic, message) ->
    {
      System.out.println("A - Received message");
      receivedLatch.countDown();
    }, subProperties2);

    System.out.println("A - Sending");
    deviceClient.publish("A", new MqttMessage("1".getBytes(), MqttQos.AT_LEAST_ONCE.getCode(), false, new MqttProperties()))
            .waitForCompletion();

    System.out.println("A - Sent");

    receivedLatch.await();
    deviceClient.close();
    serverClient.close();
  }
}

Any user of the API needs to use MqttProperties#setSubscriptionIdentifier regardless of Automatic Subscription Identifier Assignment. The property it should be checking within a subscribe is the MqttProperties#getSubscriptionIdentifier. Not the list. The doc states:

    /**
     * Subscription Identifiers. (Publish Only)
     * 
     * <p>
     * The Subscription Identifiers are associated with any subscription created or
     * modified as the result of a SUBSCRIBE packet. If a subscription was made with
     * a Subscription Identifier, then any incoming messages that match that
     * subscription will contain the associated subscription identifier, if the
     * incoming message matches multiple subscriptions made by the same client, then
     * it will contain a list of all associated subscription identifiers. This
     * property is ONLY for PUBLISH packets. For a Subscription Identifier sent in a
     * SUBSCRIBE packet, see {@link MqttProperties#getSubscriptionIdentifier()}
     * </p>
     * 
     * @return A {@link List} of Subscription Identifiers.
     */
    public List<Integer> getSubscriptionIdentifiers() {
        return publishSubscriptionIdentifiers;
    }
...

    /**
     * Subscription Identifier. (Subscribe Only)
     * 
     * <p>
     * The Subscription identifier field can be set on a SUBSCRIBE packet and will
     * be returned with any incoming PUBLISH packets that match the associated
     * subscription. This property is ONLY for SUBSCRIBE packets. For Subscription
     * Identifier(s) sent in a PUBLISH packet, see
     * {@link MqttProperties#getSubscriptionIdentifiers()}
     * </p>
     * 
     * @return The Subscription Identifier.
     */
    public Integer getSubscriptionIdentifier() {
        return subscribeSubscriptionIdentifier;
    }

The correct property asside, the newly chosen subId is used internally https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java#L1308 but never set when sending the SUBSCRIBE message https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java#L1314

TDYJeffreyDevloo commented 1 year ago

Patch for the issue:

Index: org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java b/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java
--- a/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java  (revision f4e0db802a4433645ef011e711646a09ec9fae89)
+++ b/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java  (revision 7fcaefef9bf9ed5cfa68af4086f5f77aa47b81cc)
@@ -19,6 +19,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.util.Hashtable;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -1273,12 +1274,8 @@
    public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext, MqttActionListener callback,
            IMqttMessageListener messageListener, MqttProperties subscriptionProperties) throws MqttException {

-       int subId = 0;
-       try {
-           subId = subscriptionProperties.getSubscriptionIdentifiers().get(0);
-       } catch (IndexOutOfBoundsException e) {
-           log.fine(CLASS_NAME, "subscribe", "No sub subscription property(s)");
-       }
+       int subId = Optional.ofNullable(subscriptionProperties.getSubscriptionIdentifier())
+           .orElse(0);
        // Automatic Subscription Identifier Assignment is enabled
        if (connOpts.useSubscriptionIdentifiers() && this.mqttConnection.isSubscriptionIdentifiersAvailable()) {

@@ -1294,9 +1291,15 @@
            } else {
                // Automatically assign new ID and link to callback.
                subId = this.mqttSession.getNextSubscriptionIdentifier();
+               subscriptionProperties.setSubscriptionIdentifier(subId);
+           }
+       } else {
+           if (subId != 0) {
+               log.fine(CLASS_NAME, "subscribe", "Subscription identifiers are not available or disabled");
+               subscriptionProperties.setSubscriptionIdentifier(null);
            }
        }
-       
+
        // add message handlers to the list for this client
        for (MqttSubscription subscription : subscriptions) {
            MqttTopicValidator.validate(subscription.getTopic(),