eclipse / paho.mqtt.java

Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.
https://eclipse.org/paho
Other
2.1k stars 880 forks source link

Topic filter does not match shared subscription, listeners are not notified on actual topic #965

Open Adam-Grzenda opened 1 year ago

Adam-Grzenda commented 1 year ago

Please fill out the form below before submitting, thank you!

TLDR: TopicValidator does not match shared subscription topics with actual message topics. i.e. $share/GROUP-NAME/some-topic is not matched when a message comes from some-topic.

https://github.com/eclipse/paho.mqtt.java/blob/f4e0db802a4433645ef011e711646a09ec9fae89/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/common/util/MqttTopicValidator.java#L151

Long description: I've encountered an issue using the MQTTv5 feature - shared subscriptions. In order to do a broker-side load-balancing between running application instances I have to subscribe to a topic with a prefix: $share/GROUP-NAME/

That way I can assure that my messages are distributed between instances according to my broker settings - unfortunately even though messages are delivered to my Paho client (which I can see by running a generic callback for all messages), a listener registered while subscribing is not invoked.

This is due to the messages having $share/GROUP-NAME/ prefix stripped - I've subscribed to$share/GROUP-NAME/some-topic, and incoming messages come from just some-topic.

As far as I can understand it is a part of the standard: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250

At the moment I've been forced to use a generic callback to manually invoke my listener - however this does feel like a hack.

Minimal code to reproduce

Expected standard out: Message arrived in listener, topic: someTopic/AAA-BBB-CCC, message: Actual standard out: Message arrived in callback, topic: someTopic/AAA-BBB-CCC, message:


public class MyApplication {

    public static void main(String[] args) throws MqttException, InterruptedException {
        MqttClient client = new MqttClient("tcp://localhost:1883", "myClientId", new MemoryPersistence());
        client.setCallback(new MqttCallbackImpl());
        client.connect();

        MqttSubscription subscription = new MqttSubscription("$share/GROUP-NAME/someTopic/#", 2);
        client.subscribe(new MqttSubscription[]{subscription}, new IMqttMessageListener[]{new MqttListener()})
                .waitForCompletion();

        while (true) {
            client.publish("someTopic/AAA-BBB-CCC", new MqttMessage());
            Thread.sleep(1000);
        }
    }

    public static class MqttListener implements IMqttMessageListener {
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            System.out.println("Message arrived in listener, topic: " + topic + " message: " + message);
        }
    }

    public static class MqttCallbackImpl implements MqttCallback {
        @Override 
        public void messageArrived(String topic, MqttMessage message) {
            System.out.println("Message arrived in callback, topic: " + topic + " message: " + message);
        }
        @Override public void deliveryComplete(IMqttToken token) {}
        @Override public void connectComplete(boolean reconnect, String serverURI) {}
        @Override public void authPacketArrived(int reasonCode, MqttProperties properties) {}
        @Override public void disconnected(MqttDisconnectResponse disconnectResponse) {}
        @Override public void mqttErrorOccurred(MqttException exception) {}
    }

} 
SpikeFJ commented 7 months ago

I have also encountered the same issue as you, and I suspect that the problem lies in the deliverMessage method of CommsCallback, where the MqttTopicValidator.isMatched method is not taking into account the prefix "$share" for shared subscription topics.

nwest1 commented 6 months ago

Related https://github.com/eclipse/paho.mqtt.java/pull/911