eclipse-paho / paho.mqtt.golang

Other
2.77k stars 534 forks source link

Unable to Receive Messages on HiveMQ Broker #689

Open neilor-mendes opened 2 months ago

neilor-mendes commented 2 months ago

Hi everyone,

I’m encountering an issue with my code that’s supposed to connect to a HiveMQ Broker. While the connection seems to be established successfully (as indicated by the keepAlive and ping logs), I'm not receiving any messages sent to the topic.

Here’s what I’ve done so far:

I’m unsure what might be missing or incorrectly configured. Any guidance or suggestions would be greatly appreciated!

Thank you in advance for your help.

Version

go 1.23.0

require (
        github.com/eclipse/paho.mqtt.golang v1.5.0
        github.com/golang/glog v1.2.2
)

require (
        github.com/golang/snappy v0.0.4 // indirect
        github.com/gorilla/websocket v1.5.3 // indirect
        github.com/klauspost/compress v1.13.6 // indirect
        github.com/montanaflynn/stats v0.7.1 // indirect
        github.com/pelletier/go-toml v1.9.5 // indirect
        github.com/xdg-go/pbkdf2 v1.0.0 // indirect
        github.com/xdg-go/scram v1.1.2 // indirect
        github.com/xdg-go/stringprep v1.0.4 // indirect
        github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
        go.mongodb.org/mongo-driver v1.16.1 // indirect
        golang.org/x/crypto v0.25.0 // indirect
        golang.org/x/net v0.27.0 // indirect
        golang.org/x/sync v0.7.0 // indirect
        golang.org/x/text v0.16.0 // indirect
)

Code

package main

import (
    "crypto/tls"
    "crypto/x509"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)

func main() {
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, os.Interrupt)
    signal.Notify(sig, syscall.SIGTERM)

    mqtt.ERROR = log.New(os.Stdout, "[ERROR] ", 0)
    mqtt.CRITICAL = log.New(os.Stdout, "[CRIT] ", 0)
    mqtt.WARN = log.New(os.Stdout, "[WARN]  ", 0)
    mqtt.DEBUG = log.New(os.Stdout, "[DEBUG] ", 0)

    var knt int = 0

    // Load CA certificate
    certpool := x509.NewCertPool()
    pemCerts, err := os.ReadFile("mqttutil/cacert.pem")

    if err != nil {
        log.Fatalf("Error loading CA certificate: %v", err)
    }

    if !certpool.AppendCertsFromPEM(pemCerts) {
        log.Fatalf("Failed to append CA certificate")
    }

    tlsConfig := &tls.Config{
        RootCAs:            certpool,
        ClientAuth:         tls.NoClientCert,
        ClientCAs:          nil,
        InsecureSkipVerify: true,
    }

    opts := mqtt.NewClientOptions().
        AddBroker("ssl://broker.hivemq.com:8883").
        SetClientID("mqtt2mongo").
        SetUsername("username").
        SetPassword("password").
        SetTLSConfig(tlsConfig).
        SetAutoReconnect(true).
        SetConnectRetry(true).
        SetOrderMatters(false).
        SetCleanSession(true)

    opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
        fmt.Printf("MSG: %s\n", msg.Payload())
        text := fmt.Sprintf("this is result msg #%d!", knt)
        knt++
        token := client.Publish("nn/result", 0, false, text)
        token.Wait()
    })

    topic := "nn/result"

    opts.OnConnect = func(c mqtt.Client) {
        if token := c.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
            panic(token.Error())
        }
    }

    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    } else {
        fmt.Printf("Connected to server\n")
    }

    <-sig
    fmt.Println("signal caught - exiting")
    defer client.Unsubscribe("nn/result")
    defer client.Disconnect(250)
    fmt.Println("shutdown complete")
}

Debug Log

[DEBUG] [client]   Connect()
[DEBUG] [store]    memorystore initialized
[DEBUG] [client]   about to write new connect msg
[DEBUG] [client]   socket connected to broker
[DEBUG] [client]   Using MQTT 3.1.1 protocol
[DEBUG] [net]      connect started
[DEBUG] [net]      received connack
[DEBUG] [client]   startCommsWorkers called
[DEBUG] [client]   client is connected/reconnected
[DEBUG] [net]      incoming started
[DEBUG] [net]      startIncomingComms started
[DEBUG] [net]      outgoing started
[DEBUG] [net]      startComms started
[DEBUG] [client]   startCommsWorkers done
[WARN]  [store]    memorystore wiped
Connected to server
[DEBUG] [client]   enter Subscribe
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [pinger]   keepalive starting
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncomingComms: inboundFromStore complete
[DEBUG] [client]   exit startClient
[DEBUG] [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 1 topics: [nn/result]
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [client]   sending subscribe message, topic: nn/result
[DEBUG] [client]   exit Subscribe
[DEBUG] [net]      obound priority msg to write, type *packets.SubscribePacket
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[WARN]  [store]    memorystore del: message 1 not found
[DEBUG] [net]      startIncomingComms: received suback, id: 1
[DEBUG] [net]      startIncomingComms: granted qoss [0]
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [pinger]   ping check 4.9984945
[DEBUG] [pinger]   ping check 9.9990185
[DEBUG] [pinger]   ping check 14.9984701
[DEBUG] [pinger]   ping check 19.9986013
[DEBUG] [pinger]   ping check 24.9985122
[DEBUG] [pinger]   ping check 29.998588
[DEBUG] [pinger]   ping check 34.9987725
[DEBUG] [pinger]   keepalive sending ping
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      startIncomingComms: received pingresp
...
MattBrittan commented 2 months ago

The message was received in your code as expected. Also, as expected, this led to an infinite loop where the client published a message to nn/result every time it receives a message (and hencde receives another message).

You don't indicate how/when you published a message on the topic in the above so all I can really say is that your code works as expected for me. The logs show a normal connection and subscription but no messages received (which would seem to indicate the broker did not send any messages on the specified topic).

Please note that this area is really intended for bugs; the readme suggests places to ask for more general help (Stackoverflow generally results in a quick answer when a detailed question is provided).