eclipse-paho / paho.mqtt.golang

Other
2.77k stars 534 forks source link

Timeout subscribing to particular topic #679

Closed n1mda closed 5 months ago

n1mda commented 5 months ago

Hi,

I'm using paho.mqtt.golang to subscribe to topics in eclipse-mosquitto:2.0 that are published by zigbee2mqtt:1.37.1

This is my function to subscribe to a topic:

func (c *MQTTClient) Subscribe(topic string, callback MessageHandler) error {
    if _, ok := c.subscriptions[topic]; ok {
        return nil
    }

    c.callbacks[topic] = callback

    cb := func(client paho.Client, message paho.Message) {
        c.env.Logger.Info("Callback invoked", "topic", topic)
        c.callbacks[topic](*c, message)
    }

    token := c.client.Subscribe(topic, 0, cb)
    if token.WaitTimeout(5*time.Second) && token.Error() != nil {
        c.env.Logger.Error("Error subscribing to topic", "topic", topic, "error", token.Error())
        return token.Error()
    } else if !token.WaitTimeout(5 * time.Second) {
        c.env.Logger.Error("Timeout subscribing to topic", "topic", topic)
        return fmt.Errorf("timeout subscribing to topic")
    }

    c.subscriptions[topic] = true

    return nil
}

I subscribe to end-devices in zigbee2mqtt, a topic name is typically zigbee2mqtt/<name of device>. This works for most topics, but one of the topics I subscribe to times out:

msg="Timeout subscribing to topic" topic=zigbee2mqtt/0x0015bc0031010443

All other topics works as expected.

However, once it has timed out I still receive messages from that topic so it's not really an error. If I use Wait() instead of WaitTimeout() it waits indefinitely.

If I subscribe to the topic using a cli mqtt client it subscribes successfully immediately:

$ mqtt sub -t zigbee2mqtt/0x0015bc0031010443
{"battery":100,"battery_low":false,"linkquality":116,"restore_reports":true,"smoke":false,"supervision_reports":true,"test":false,"update":{"installed_version":262152,"latest_version":262152,"state":"idle"},"voltage":3000}

Logs from paho:

[DEBUG] [client]   Connect()
[DEBUG] [store]    memorystore initialized
[DEBUG] [client]   about to write new connect msg
[DEBUG] [client]   socket connected to broker
[DEBUG] [client]   Using MQTT 3.1.1 protocol
[DEBUG] [net]      connect started
[DEBUG] [net]      received connack
[DEBUG] [client]   startCommsWorkers called
[DEBUG] [client]   client is connected/reconnected
[DEBUG] [net]      incoming started
[DEBUG] [net]      startIncomingComms started
[DEBUG] [net]      outgoing started
[DEBUG] [net]      startComms started
[DEBUG] [client]   startCommsWorkers done
[WARN]  [store]    memorystore wiped
[DEBUG] [client]   exit startClient
[DEBUG] [client]   enter Subscribe
[DEBUG] [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 1 topics: [zigbee2mqtt/bridge/devices]
[DEBUG] [client]   sending subscribe message, topic: zigbee2mqtt/bridge/devices
[DEBUG] [pinger]   keepalive starting
[DEBUG] [client]   exit Subscribe
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncomingComms: inboundFromStore complete
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      obound priority msg to write, type *packets.SubscribePacket
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[WARN]  [store]    memorystore del: message 1 not found
[DEBUG] [net]      startIncomingComms: received suback, id: 1
[DEBUG] [net]      startIncomingComms: granted qoss [0]
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [client]   enter Subscribe
[DEBUG] [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 2 topics: [zigbee2mqtt/bridge/event]
[DEBUG] [client]   sending subscribe message, topic: zigbee2mqtt/bridge/event
[DEBUG] [client]   exit Subscribe
[DEBUG] [net]      obound priority msg to write, type *packets.SubscribePacket
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      startIncomingComms: received publish, msgId: 0
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncomingComms: got msg on ibound
[WARN]  [store]    memorystore del: message 2 not found
[DEBUG] [net]      startIncomingComms: received suback, id: 2
[DEBUG] [net]      startIncomingComms: granted qoss [0]
[DEBUG] [net]      logic waiting for msg on ibound
msg="Callback invoked" topic=zigbee2mqtt/bridge/devices
[DEBUG] [client]   enter Subscribe
[DEBUG] [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 3 topics: [zigbee2mqtt/0x54ef441000a76f17]
[DEBUG] [client]   sending subscribe message, topic: zigbee2mqtt/0x54ef441000a76f17
[DEBUG] [client]   exit Subscribe
[DEBUG] [net]      obound priority msg to write, type *packets.SubscribePacket
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[WARN]  [store]    memorystore del: message 3 not found
[DEBUG] [net]      startIncomingComms: received suback, id: 3
[DEBUG] [net]      startIncomingComms: granted qoss [0]
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [client]   enter Subscribe
[DEBUG] [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 4 topics: [zigbee2mqtt/0x0015bc001e013ebc]
[DEBUG] [client]   sending subscribe message, topic: zigbee2mqtt/0x0015bc001e013ebc
[DEBUG] [client]   exit Subscribe
[DEBUG] [net]      obound priority msg to write, type *packets.SubscribePacket
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      startIncomingComms: received publish, msgId: 0
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[WARN]  [store]    memorystore del: message 4 not found
[DEBUG] [net]      startIncomingComms: received suback, id: 4
[DEBUG] [net]      startIncomingComms: granted qoss [0]
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      startIncomingComms: received publish, msgId: 0
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [client]   enter Subscribe
[DEBUG] [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 5 topics: [zigbee2mqtt/0x54ef441000a322e1]
[DEBUG] [client]   sending subscribe message, topic: zigbee2mqtt/0x54ef441000a322e1
[DEBUG] [client]   exit Subscribe
[DEBUG] [net]      obound priority msg to write, type *packets.SubscribePacket
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[WARN]  [store]    memorystore del: message 5 not found
[DEBUG] [net]      startIncomingComms: received suback, id: 5
[DEBUG] [net]      startIncomingComms: granted qoss [0]
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [client]   enter Subscribe
[DEBUG] [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 6 topics: [zigbee2mqtt/0x0015bc0031010443]
[DEBUG] [client]   sending subscribe message, topic: zigbee2mqtt/0x0015bc0031010443
[DEBUG] [client]   exit Subscribe
[DEBUG] [net]      obound priority msg to write, type *packets.SubscribePacket
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      startIncomingComms: received publish, msgId: 0
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [pinger]   ping check 4.84409225
[DEBUG] [pinger]   ping check 9.844058791
msg="Timeout subscribing to topic" topic=zigbee2mqtt/0x0015bc0031010443
msg="Callback invoked" topic=zigbee2mqtt/0x54ef441000a76f17
msg="Callback invoked" topic=zigbee2mqtt/0x0015bc001e013ebc
msg="Callback invoked" topic=zigbee2mqtt/0x54ef441000a322e1
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncomingComms: got msg on ibound
[WARN]  [store]    memorystore del: message 6 not found
[DEBUG] [net]      startIncomingComms: received suback, id: 6
[DEBUG] [net]      startIncomingComms: granted qoss [0]
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      startIncomingComms: received publish, msgId: 0
[DEBUG] [net]      logic waiting for msg on ibound
msg="Callback invoked" topic=zigbee2mqtt/0x0015bc0031010443
[DEBUG] [pinger]   ping check 14.844015875
[DEBUG] [pinger]   ping check 19.844011
[DEBUG] [pinger]   ping check 24.844261375
[DEBUG] [pinger]   ping check 29.844297916
[DEBUG] [pinger]   ping check 34.843765708
[DEBUG] [pinger]   keepalive sending ping
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      startIncomingComms: received pingresp
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [pinger]   ping check 4.999668125

My Connect function:

func NewClient(env *config.Env, broker string) (MQTTClient, error) {
    options := paho.NewClientOptions()
    options.AddBroker(broker)
    options.SetClientID("my-mqtt-client")

    client := paho.NewClient(options)
    mqttClient := MQTTClient{
        env:           env,
        client:        client,
        subscriptions: make(map[string]bool),
        callbacks:     make(map[string]MessageHandler),
    }

    options.SetDefaultPublishHandler(mqttClient.publishHandler)
    options.OnConnect = mqttClient.onConnectHandler
    options.OnConnectionLost = mqttClient.onConnectionLostHandler
    options.SetOrderMatters(false)

    paho.ERROR = log.New(os.Stdout, "[ERROR] ", 0)
    paho.CRITICAL = log.New(os.Stdout, "[CRIT] ", 0)
    paho.WARN = log.New(os.Stdout, "[WARN]  ", 0)
    paho.DEBUG = log.New(os.Stdout, "[DEBUG] ", 0)

    if token := client.Connect(); token.Wait() && token.Error() != nil {
        return MQTTClient{}, token.Error()
    }

    return mqttClient, nil
}

I'm unsure where the issue is coming from, but since my application code can subscribe to other topics, and mqtt cli can subscribe to the problematic topic I think there is an issue in the paho library.

MattBrittan commented 5 months ago

but since my application code can subscribe to other topics, and mqtt cli can subscribe to the problematic topic I think there is an issue in the paho library

At this point I think there is insufficient evidence to say either way; it looks like no SUBACK is being received (or, if it is being received, it's not being processed successfully). Unfortunately without logs or a way to reproduce the issue (see reporting bugs in the readme) it's not really possible to help.

n1mda commented 5 months ago

but since my application code can subscribe to other topics, and mqtt cli can subscribe to the problematic topic I think there is an issue in the paho library

At this point I think there is insufficient evidence to say either way; it looks like no SUBACK is being received (or, if it is being received, it's not being processed successfully). Unfortunately without logs or a way to reproduce the issue (see reporting bugs in the readme) it's not really possible to help.

Thank you @MattBrittan for taking the time to respond. I have updated the issue with logs and my Connect function. I'm not sure if you will be able to reproduce the issue since it's unclear (to me atleast) why this specific topic times out. You will need a Zigbee adapter and paired device to subscribe to a device topic. Here is my docker-compose.yml file to run mosquitto and zigbee2mqtt:

version: "3.8"
services:
  mqtt:
    container_name: mqtt
    restart: unless-stopped
    image: eclipse-mosquitto:2.0
    volumes:
      - "./mosquitto-data:/mosquitto"
    ports:
      - "1883:1883"
      - "9001:9001"
    command: "mosquitto -c /mosquitto-no-auth.conf"

  zigbee2mqtt:
    container_name: zigbee2mqtt
    restart: unless-stopped
    image: koenkk/zigbee2mqtt:1.37.1
    volumes:
      - ./zigbee2mqtt-data:/app/data
      - /run/udev:/run/udev:ro
    ports:
      - 8080:8080
    environment:
      - TZ=Europe/Stockholm
    devices:
      - /dev/ttyACM0:/dev/ttyACM0

Here is a minimal go file to reproduce:

package demo

import (
    "fmt"
    "log"
    "os"
    "time"

    paho "github.com/eclipse/paho.mqtt.golang"
)

type MQTTClient struct {
    client        paho.Client
    subscriptions map[string]bool
    callbacks     map[string]MessageHandler
}

type MessageHandler func(MQTTClient, paho.Message)

func NewClient(broker string) (MQTTClient, error) {
    options := paho.NewClientOptions()
    options.AddBroker(broker)
    options.SetClientID("my-mqtt-client")

    client := paho.NewClient(options)
    mqttClient := MQTTClient{
        client:        client,
        subscriptions: make(map[string]bool),
        callbacks:     make(map[string]MessageHandler),
    }

    options.SetDefaultPublishHandler(mqttClient.publishHandler)
    options.OnConnect = mqttClient.onConnectHandler
    options.OnConnectionLost = mqttClient.onConnectionLostHandler
    options.SetOrderMatters(false)

    paho.ERROR = log.New(os.Stdout, "[ERROR] ", 0)
    paho.CRITICAL = log.New(os.Stdout, "[CRIT] ", 0)
    paho.WARN = log.New(os.Stdout, "[WARN]  ", 0)
    paho.DEBUG = log.New(os.Stdout, "[DEBUG] ", 0)

    if token := client.Connect(); token.Wait() && token.Error() != nil {
        return MQTTClient{}, token.Error()
    }

    // Test subscribe here
    if err := mqttClient.Subscribe("zigbee2mqtt/<device name>", func(client MQTTClient, message paho.Message) {
        fmt.Println("Message received", "topic", message.Topic(), "payload", string(message.Payload()))
    }); err != nil {
        fmt.Println("Kaboom")
    }

    return mqttClient, nil
}

func (c *MQTTClient) Subscribe(topic string, callback MessageHandler) error {
    if _, ok := c.subscriptions[topic]; ok {
        return nil
    }

    c.callbacks[topic] = callback

    cb := func(client paho.Client, message paho.Message) {
        fmt.Println("Callback invoked")
        c.callbacks[topic](*c, message)
    }

    token := c.client.Subscribe(topic, 0, cb)
    if token.WaitTimeout(5*time.Second) && token.Error() != nil {
        fmt.Println("Error subscribing to topic")
        return token.Error()
    } else if !token.WaitTimeout(5 * time.Second) {
        fmt.Println("Timeout subscribing to topic")
        return fmt.Errorf("timeout subscribing to topic")
    }

    c.subscriptions[topic] = true

    return nil
}

func (c *MQTTClient) publishHandler(client paho.Client, msg paho.Message) {
    fmt.Println("Received message on default handler")
}

func (c *MQTTClient) onConnectHandler(client paho.Client) {
    fmt.Println("Connected to MQTT broker")
}

func (c *MQTTClient) onConnectionLostHandler(client paho.Client, err error) {
    fmt.Println("Connection to MQTT broker lost")
}
MattBrittan commented 5 months ago

I'm not sure if you will be able to reproduce the issue since it's unclear (to me atleast) why this specific topic times out.

This should not, in theory, be irrelevant. Unless I'm misunderstanding you the Go app is connecting to Mosquitto (and only Mosquitto). This means that it's Mosquitto that will be responding to the SUBSCRIBE request (and, in theory, the issue would occur whether a separate publisher was sending messages or not).

The relevant section of the logs is here:

[DEBUG] [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 6 topics: [zigbee2mqtt/0x0015bc0031010443]
[DEBUG] [client]   sending subscribe message, topic: zigbee2mqtt/0x0015bc0031010443
[DEBUG] [client]   exit Subscribe
[DEBUG] [net]      obound priority msg to write, type *packets.SubscribePacket
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      startIncomingComms: received publish, msgId: 0
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [pinger]   ping check 4.84409225
[DEBUG] [pinger]   ping check 9.844058791
msg="Timeout subscribing to topic" topic=zigbee2mqtt/0x0015bc0031010443
msg="Callback invoked" topic=zigbee2mqtt/0x54ef441000a76f17
msg="Callback invoked" topic=zigbee2mqtt/0x0015bc001e013ebc
msg="Callback invoked" topic=zigbee2mqtt/0x54ef441000a322e1
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncomingComms: got msg on ibound
[WARN]  [store]    memorystore del: message 6 not found
[DEBUG] [net]      startIncomingComms: received suback, id: 6

So, it appears that the call to Subscribe is timing out (msg="Timeout subscribing to topic" topic=zigbee2mqtt/0x0015bc0031010443) and the SUBACK is received after this ([DEBUG] [net] startIncomingComms: received suback, id: 6).

The interesting thing here is that you received three messages in this period (the 3 x msg="Callback invoked") so my theory would be that a callback is blocking which is holding things up. Now you may say "but I've called SetOrderMatters(false) so this should not matter" however, effectively, that is not what you have done:

client := paho.NewClient(options)
mqttClient := MQTTClient{
    client:        client,
    subscriptions: make(map[string]bool),
    callbacks:     make(map[string]MessageHandler),
}
options.SetDefaultPublishHandler(mqttClient.publishHandler)
options.OnConnect = mqttClient.onConnectHandler
options.OnConnectionLost = mqttClient.onConnectionLostHandler
options.SetOrderMatters(false)

paho.NewClient(options) takes a copy of the options passed, in so any changes you make to it after that point will have no impact; please reorder this code and see if that helps (I'm guessing the issue is that your callback is holding up processing).

Note that the following code is confusing;

token := c.client.Subscribe(topic, 0, cb)
if token.WaitTimeout(5*time.Second) && token.Error() != nil {
    fmt.Println("Error subscribing to topic")
    return token.Error()
} else if !token.WaitTimeout(5 * time.Second) {
    fmt.Println("Timeout subscribing to topic")
    return fmt.Errorf("timeout subscribing to topic")
}

This will effectively wait 10 seconds before timing out (and ignore any error returned in the second 5 seconds); call token.WaitTimeout once and then do something with the result.

n1mda commented 5 months ago

Thank you @MattBrittan

I can confirm that reordering the options, and specifically setting options.SetOrderMatters(false) was the solution to this.