eclipse / paho.mqtt.golang

Other
2.73k stars 533 forks source link

Duplicate Callback Execution Issue in MQTT Subscription with Wildcard Subscriptions #642

Closed jame40307 closed 1 year ago

jame40307 commented 1 year ago

I am trying to create a wildcard subscription for topic, it looks like:

func main() {
    var broker = "broker.emqx.io"
    var port = 1883
    opts := mqtt.NewClientOptions()
    opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
    opts.SetClientID("go_mqtt_client")
    opts.SetUsername("emqx")
    opts.SetPassword("public")
    opts.SetDefaultPublishHandler(messagePubHandler)
    opts.OnConnect = connectHandler
    opts.OnConnectionLost = connectLostHandler
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    sub(client)
    sub2(client)
    publish(client)

    client.Disconnect(250)
}

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { 
    fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}

func sub(client mqtt.Client) {
    topic := "topic/#"
    token := client.Subscribe(topic, 1, messagePubHandler)
    token.Wait()
    fmt.Printf("Subscribed to topic %s \n", topic)
}

func sub2(client mqtt.Client) {
    topic := "topic/test/1"
    token := client.Subscribe(topic, 1, messagePubHandler)
    token.Wait()
    fmt.Printf("Subscribed to topic %s \n", topic)
}

func publish(client mqtt.Client) {
    text := fmt.Sprintf("Message %d", 1)
    token := client.Publish("topic/test/"+strconv.Itoa(1), 0, false, text)
    token.Wait()
    time.Sleep(time.Second)
}

When I publish a message to the topic "topic/test/1", I expect the messagePubHandler to be called twice. However, it is being called four times. This means that for each subscribed topic, the broker sends the message to the receive channel (2 times), and for each received message, it goes through all the message routes to find a matching handler (also 2 times). This results in a total of 2*2=4 calls.

I would like to know if this behavior is correct. If it is, is there a way to achieve the expected behavior?

MattBrittan commented 1 year ago

This is quite possible; the MQTT spec says:

When Clients make subscriptions with Topic Filters that include wildcards, it is possible for a Client’s subscriptions to overlap so that a published message might match multiple filters. In this case the Server MUST deliver the message to the Client respecting the maximum QoS of all the matching subscriptions [MQTT-3.3.5-1]. In addition, the Server MAY deliver further copies of the message, one for each additional matching subscription and respecting the subscription’s QoS in each case.

So whether the broker sends one copy or two is not defined and what happend depends upon your broker (e.g. Mosquitto provides the option allow_duplicate_messages, EMQX sends two messages). The v5 spec addresses this.

Unfortunately the client has no way of knowing this. All it knows is that it receives two messages, both of these match both subscriptions so it passes them to the callback.

The only ways around this that I'm aware of are:

  1. Don't use overlapping subscriptions
  2. Partial solution - use SetDefaultPublishHandler and set the callbacks to nil (e.g. client.Subscribe(topic, 1, nil)) - the call back will still be called twice (but I guess that is better than four times!)
jame40307 commented 1 year ago

Thank you for your response. It helps a lot in understanding where the issue lies.

Actually, I am providing a wrapper package that includes an MQTT module. I want it to offer different topic options with wildcards for a specific topic (e.g., for "item/10000001/change", users can subscribe with options like "item/#", "item/+/change", and so on). This means that I shouldn't prevent external systems from subscribing with overlapping topics. Some systems using my module may be interested in all items, while others may only be interested in specific item changes, and so on.

I think I will try the following approaches:

  1. The second approach you provided, and raise a warning when checking for overlapping wildcards.
  2. Use a mechanism similar to addRoute, where wildcards are not directly subscribed with the broker but are checked separately when triggering the specific handler function.
MattBrittan commented 1 year ago

Great - I'm going to close this issue (as it's not really an issue with the library and it seems you have a solution).