quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.82k stars 2.69k forks source link

MQTT connector duplicates messages when multiple connectors have matchig topics #32462

Closed andreas-eberle closed 1 year ago

andreas-eberle commented 1 year ago

Describe the bug

I have a service where I connect to an MQTT broker with multiple connectors, each having a different topic pattern. These topic patterns overlapp but are different. If the MQTT broker receives a message on a topic that matches with multiple of the connectors, each of those connectors receives every message as often as connectors matched.

E.g. I have the following config in the application.properties (reproducer code below)

# Consumer1
mp.messaging.incoming.consumer1.connector=smallrye-mqtt
mp.messaging.incoming.consumer1.host=public.mqtthq.com
mp.messaging.incoming.consumer1.port=1883
mp.messaging.incoming.consumer1.topic=arconsis/#

# Consumer2
mp.messaging.incoming.consumer2.connector=smallrye-mqtt
mp.messaging.incoming.consumer2.host=public.mqtthq.com
mp.messaging.incoming.consumer2.port=1883
mp.messaging.incoming.consumer2.topic=arconsis/test/#

# Consumer3
mp.messaging.incoming.consumer3.connector=smallrye-mqtt
mp.messaging.incoming.consumer3.host=public.mqtthq.com
mp.messaging.incoming.consumer3.port=1883
mp.messaging.incoming.consumer3.topic=arconsis/test/quarkus

So there are 3 connectors. The first has the most generic topic, the last one has the most specific one. If we send a message to the most specific topic, each of these connectors receives the message 3 times. If you add more connectors with a matching pattern, each connector will receive the message even more often.

Expected behavior

Every connector should receive the message only once.

Actual behavior

Every connector receives the message multiple times.

How to Reproduce?

Reproducer: 2023-04-06-mqtt-multi-subscriptions.zip

  1. Download and unzip the reproducer
  2. start the application with ./gradlew quarkusDev
  3. Go to the configured broker https://mqtthq.com/client or configure your own broker
  4. Send any message to the topic arconsis/test/quarkus
  5. Observe that the only receiver for consumer1 receives the message 3 times.
  6. If you send a message to arconsis/test, the message will be received only 2 times.

Output of uname -a or ver

No response

Output of java -version

JDK17

GraalVM version (if different from Java)

No response

Quarkus version or git rev

At least since 2.14.1 up to 2.16.6

Build tool (ie. output of mvnw --version or gradlew --version)

No response

Additional information

No response

quarkus-bot[bot] commented 1 year ago

/cc @cescoffier (reactive-messaging), @ozangunalp (reactive-messaging)

cescoffier commented 1 year ago

I can't reproduce it with reactive messaging:

package io.smallrye.reactive.messaging.mqtt;

import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.weld.environment.se.Weld;
import org.jboss.weld.environment.se.WeldContainer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import static org.awaitility.Awaitility.await;

/**
 * Reproduce <a href="https://github.com/quarkusio/quarkus/issues/32462">#32462</a>.
 */
public class MultipleConnectorTest extends MqttTestBase {

    private WeldContainer container;

    @AfterEach
    public void cleanup() {
        if (container != null) {
            container.close();
        }
        Clients.clear();
    }

    @Test
    void test() {
        Weld weld = baseWeld(multiConnectorConfig());
        weld.addBeanClass(MyConsumers.class);
        container = weld.initialize();

        usage.produceStrings("arconsis/test/quarkus", 1, null, () -> {
            return "hello";
        });

        MyConsumers consumers = container.select(MyConsumers.class).get();
        await()
                .pollDelay(Duration.ofSeconds(1))
                .until(() -> consumers.getListFromConsumer1().size() == 1);
        await()
                .pollDelay(Duration.ofSeconds(1))
                .until(() -> consumers.getListFromConsumer2().size() == 1);
        await()
                .pollDelay(Duration.ofSeconds(1))
                .until(() -> consumers.getListFromConsumer3().size() == 1);

        // Make sure we didn't get duplicates
        await()
                .pollDelay(Duration.ofSeconds(1))
                .until(() -> consumers.getListFromConsumer1().size() == 1);
        await()
                .pollDelay(Duration.ofSeconds(1))
                .until(() -> consumers.getListFromConsumer2().size() == 1);
        await()
                .pollDelay(Duration.ofSeconds(1))
                .until(() -> consumers.getListFromConsumer3().size() == 1);
    }

    private MapBasedConfig multiConnectorConfig() {
        var config = new MapBasedConfig();
        //# Consumer1
        String prefix = "mp.messaging.incoming.consumer1.";
        config.with(prefix, "connector", MqttConnector.CONNECTOR_NAME)
                .with(prefix, "host", System.getProperty("mqtt-host"))
                .with(prefix, "port", Integer.valueOf(System.getProperty("mqtt-port")))
                .with(prefix, "topic", "arconsis/#");

        if (System.getProperty("mqtt-user") != null) {
            config.with(prefix, "username", System.getProperty("mqtt-user"));
            config.with(prefix, "password", System.getProperty("mqtt-pwd"));
        }

        //# Consumer2
        prefix = "mp.messaging.incoming.consumer2";
        config.with(prefix, "connector", MqttConnector.CONNECTOR_NAME)
                .with(prefix, "host", System.getProperty("mqtt-host"))
                .with(prefix, "port", Integer.valueOf(System.getProperty("mqtt-port")))
                .with(prefix, "topic", "arconsis/test/#");

        if (System.getProperty("mqtt-user") != null) {
            config.with(prefix, "username", System.getProperty("mqtt-user"));
            config.with(prefix, "password", System.getProperty("mqtt-pwd"));
        }

        //# Consumer3
        prefix = "mp.messaging.incoming.consumer3";
        config.with(prefix, "connector", MqttConnector.CONNECTOR_NAME)
                .with(prefix, "host", System.getProperty("mqtt-host"))
                .with(prefix, "port", Integer.valueOf(System.getProperty("mqtt-port")))
                .with(prefix, "topic", "arconsis/test/quarkus");

        if (System.getProperty("mqtt-user") != null) {
            config.with(prefix, "username", System.getProperty("mqtt-user"));
            config.with(prefix, "password", System.getProperty("mqtt-pwd"));
        }

        return config;
    }

    @ApplicationScoped
    public static class MyConsumers {

        private final List<String> listFromConsumer1 = new CopyOnWriteArrayList<>();
        private final List<String> listFromConsumer2 = new CopyOnWriteArrayList<>();
        private final List<String> listFromConsumer3 = new CopyOnWriteArrayList<>();

        @Incoming("consumer1")
        public void consume1(String s) {
            System.out.println("1 >> " + s);
            listFromConsumer1.add(s);
        }

        @Incoming("consumer2")
        public void consume2(String s) {
            System.out.println("2 >> " + s);
            listFromConsumer2.add(s);
        }

        @Incoming("consumer3")
        public void consume3(String s) {
            System.out.println("3 >> " + s);
            listFromConsumer3.add(s);
        }

        public List<String> getListFromConsumer1() {
            return listFromConsumer1;
        }

        public List<String> getListFromConsumer2() {
            return listFromConsumer2;
        }

        public List<String> getListFromConsumer3() {
            return listFromConsumer3;
        }
    }

}

@andreas-eberle can you spot a difference?

cescoffier commented 1 year ago

I'm afraid it comes from https://mqtthq.com/client .I cannot reproduce with another MQTT server (I tried mosquitto and hive).

I guess it's because they do some association between the connection IP and the subscription.

Please re-open once you verified with another MQTT server.