eclipse / paho.mqtt.golang

Other
2.73k stars 533 forks source link

publish was broken by timeout #686

Closed huifer closed 2 weeks ago

huifer commented 2 weeks ago

Env:

  1. github.com/eclipse/paho.mqtt.golang v1.5.0
  2. emqx 23.2.7.2-emqx-2/11.1.8 Node 'emqx@127.0.0.1' 4.3.5 is started

Code

package main

import (
    "bufio"
    "encoding/json"
    "fmt"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "go.uber.org/zap"
    "log"
    "math/rand"
    "os"
    "os/signal"
    "strconv"
    "strings"
    "syscall"
    "time"
)

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
    fmt.Println("Connected")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    fmt.Printf("Connect lost: %v", err)
}

func main() {
    InitLog()
    opts := createClientOptions()
    client := connectMQTT(opts)

    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)

    go func() {
        <-signalChan
        fmt.Println("Received interrupt signal. Shutting down...")
        client.Disconnect(250)
        os.Exit(0)
    }()

    fime := readFime("1.txt")
    for {
        vc := fime[0]
        go publish(client, vc.Topic, vc.ID, vc.ID)
    }

}

func createClientOptions() *mqtt.ClientOptions {
    opts := mqtt.NewClientOptions()
    opts.AddBroker("tcp://localhost:1883")
    opts.SetClientID("go_mqtt_client")
    opts.SetUsername("admin")
    opts.SetPassword("public")
    opts.SetDefaultPublishHandler(messagePubHandler)
    opts.SetAutoReconnect(true)
    opts.SetMaxReconnectInterval(time.Second * 10)
    opts.SetKeepAlive(time.Second * 30)
    opts.SetPingTimeout(time.Second * 10)
    opts.SetConnectTimeout(time.Second * 30)
    opts.OnConnect = connectHandler
    opts.OnConnectionLost = connectLostHandler
    return opts
}

func connectMQTT(opts *mqtt.ClientOptions) mqtt.Client {
    zap.S().Infof("开始连接MQTT")
    client := mqtt.NewClient(opts)
    token := client.Connect()
    for !token.WaitTimeout(3 * time.Second) {
    }
    if err := token.Error(); err != nil {
        log.Printf("Error connecting to MQTT broker: %v", err)
        time.Sleep(time.Second * 5)
        return connectMQTT(opts)
    }
    return client
}
func readFime(path string) []Vc {
    // 读取 path 每一行用空格分割分隔的数据 第一个是 Topic 第二个是ID

    file, err := os.Open(path)
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    var vcs []Vc
    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        line := scanner.Text()
        fields := strings.Split(line, "\t")
        if len(fields) != 2 {
            log.Printf("Invalid line format: %s", line)
            continue
        }
        topic := fields[0]
        id, err := strconv.Atoi(fields[1])
        if err != nil {
            log.Printf("Invalid ID: %s", fields[1])
            continue
        }
        vcs = append(vcs, Vc{Topic: topic, ID: id})
    }
    if err := scanner.Err(); err != nil {
        log.Fatal(err)
    }
    return vcs
}

type Vc struct {
    Topic string
    ID    int
}

func publish(client mqtt.Client, topic string, i int, i2 int) {
    // 初始化随机数生成器的种子
    rand.Seed(time.Now().UnixNano())

    // 生成随机数

    var dataRows []DataRow
    for i3 := range 200 {
        randomNum := rand.Intn(21) // Intn返回一个[0, n)范围内的随机数
        dataRows = append(dataRows, DataRow{
            Name:  "信号-" + strconv.Itoa(i3),
            Value: strconv.Itoa(randomNum),
        })
    }

    //
    DataRowList := DataRowList{
        Time:               time.Now().Unix(),
        DeviceUid:          strconv.Itoa(i2),
        IdentificationCode: strconv.Itoa(i2),
        DataRows:           dataRows,
        Nc:                 strconv.Itoa(i2),
    }

    marshal, _ := json.Marshal(DataRowList)

    //fmt.Printf("发送消息: %s  消息主题: %s\n", DataRowList.Time, topic)
    token := client.Publish(topic, 0, false, marshal)

    if token.Wait() && token.Error() != nil {
        zap.S().Error(token.Error())
    }

    time.Sleep(1 * time.Second) // 暂停1秒

}

type DataRowList struct {
    Time               int64     `json:"Time"`               // 秒级时间戳
    DeviceUid          string    `json:"DeviceUid"`          // 能够产生网络通讯的唯一编码
    IdentificationCode string    `json:"IdentificationCode"` // 设备标识码
    DataRows           []DataRow `json:"DataRows"`
    Nc                 string    `json:"Nc"`
}
type DataRow struct {
    Name  string `json:"Name"`
    Value string `json:"Value"`
}

After running for a period of time

2024-08-26 14:00:52     info    test/main.go:72 开始连接MQTT
Connected
2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout
2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout
2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout
2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout
2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout
2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout
2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout
2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout
2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout
2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout
2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout
2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout
MattBrittan commented 2 weeks ago

I am not surprised to see a timeout with this code. You are publishing in a tight loop which is likely to generate messages more quickly than they can be transmitted over the network (so you will end up with timeouts). Perhaps add a concurrency limit.

huifer commented 2 weeks ago

Does this mean that I just need to modify the sending frequency?

@MattBrittan

huifer commented 2 weeks ago

In Docker

[ERROR] [client]   Connect comms goroutine - error triggered EOF
2024-08-26 07:25:35 error   go-iot/mqtt.go:60   失去链接: EOF
2024-08-26 07:25:35 error   go-iot/mqtt.go:63   失去链接,id: TT_21 ,error EOF:
2024-08-26 07:25:35 info    go-iot/mqtt_redis.go:22 AddNoUseConfig 开始, config = {172.17.0.1 1883 admin public /test_topic/21 TT_21}, body = {"broker":"172.17.0.1","port":1883,"username":"admin","password":"public","sub_topic":"/test_topic/21","client_id":"TT_21"}
2024-08-26 07:25:35 info    go-iot/mqtt.go:79   StopMqttClient 开始, clientId = TT_21
[ERROR] [net]      outgoing oboundp reporting error  write tcp 172.29.0.3:34380->172.17.0.1:1883: use of closed network connection
[ERROR] [client]   Connect comms goroutine - error triggered write tcp 172.29.0.3:34380->172.17.0.1:1883: use of closed network connection
[ERROR] [client]   internalConnLost unexpected status: disconnection already in progress
MattBrittan commented 2 weeks ago

You have a loop that just starts goroutines continually. Each goroutine calls Publish so its highly likely that the result is that the network link will reach its capacity, and messages start queueing. Eventually that will mean the messages will timeout (i suspect that if you subscribe to the topic you will find that messages are still being sent).

As far as i can see the library is behaving as designed (eventually it will probably crash as you will run out of memory due to the number of goroutines running).

I cannot see any reason why you would start goroutines like this in a production system so i cant really tell you how to fix it. If you want to send messages at the maximum possible rate then, perhaps, limit the numner of active goroutines to say 20 (some experimentation would be needed to find the right number).

huifer commented 2 weeks ago

Sorry, it's my problem. I should wait outside for 1 second

huifer commented 2 weeks ago

I need your help

2024-08-26 09:19:28 error   go-iot/mqtt.go:60   失去链接: pingresp not received, disconnecting
2024-08-26 09:19:28 error   go-iot/mqtt.go:63   失去链接,id: TT_93 ,error pingresp not received, disconnecting:
MattBrittan commented 2 weeks ago

Thats normally caused by a handler that runs too long. Without debugging infoni cant really help, and if you are load testing the code (as with your initial issue) then this kind of thing is expected.

huifer commented 2 weeks ago

My MQTT client hopes to run for a long time, it must be running for a long time

huifer commented 2 weeks ago

My MQTT client hopes to run for a long time, it must be running for a long time

https://github.com/iot-ecology/go-iot-platform/blob/feat-protocol/go-iot/mqtt.go

huifer commented 2 weeks ago

[ERROR] [client] disconnect called whist connection attempt in progress

MattBrittan commented 2 weeks ago

I have clients that use this library that have been running for over a year (not on the latest release but the changes have been minor). The error you note above occurs when the client attempts to Connect whilst a Disconnect operation is in progress. I had a really quick look at your code and cannot see this happening (but have no idea if that was the code you were running when you got this warning).

Sorry but I don't have time to review all of your code, if you discover a specific issue then please log a bug report with full deails (as in your original post here, however that was not a bug but the result of your infinite loop and expected behaviour).

huifer commented 2 weeks ago

I JUST RUN APP. log :

2024-08-26 09:19:28 error go-iot/mqtt.go:60 失去链接: pingresp not received, disconnecting 2024-08-26 09:19:28 error go-iot/mqtt.go:63 失去链接,id: TT_93 ,error pingresp not received, disconnecting: [ERROR] [client] disconnect called whist connection attempt in progress

MattBrittan commented 2 weeks ago

Sorry - I'm not going to work through your code. If you provide a minimal reproducable example (and, ideally logs etc) then I'll take a look. However I will note that it's a bit unusual to call .SetAutoReconnect(true) and then call Disconnect yourself when the connection is lost. Your code also has at least one data race (on the map c).

huifer commented 2 weeks ago

log file

[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      startIncomingComms: received publish, msgId: 0
[DEBUG] [client]   disconnecting
[DEBUG] [client]   calling WaitTimeout
[DEBUG] [client]   WaitTimeout done
[DEBUG] [client]   stopCommsWorkers called
[DEBUG] [net]      obound priority msg to write, type *packets.DisconnectPacket
[DEBUG] [client]   stopCommsWorkers waiting for workers
[ERROR] [net]      outgoing oboundp reporting error  write tcp 198.18.0.1:54689->localhost:1883: use of closed network connection
[DEBUG] [net]      outgoing waiting for an outbound message
[ERROR] [client]   Connect comms goroutine - error triggered write tcp 198.18.0.1:54689->localhost:1883: use of closed network connection
[DEBUG] [client]   internalConnLost called
[ERROR] [client]   internalConnLost unexpected status: disconnection already in progress
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      incoming complete
[DEBUG] [net]      startIncomingComms: ibound complete

I have stopped using .SetAutoReconnect(true)

How can I avoid the data competition issue you mentioned in this case。

huifer commented 2 weeks ago

test-2024-08-27T11-02-52.656.log

MattBrittan commented 2 weeks ago

the data competition issue Lock your mutex whenever you access the map (this is purerly in your code).

Sorry - I cannot help unless you can provide a minimal, reproducable example (I don't have time to work through all of your code). My gurss would be that the "Disconnect packet not sent due to timeout" is probably due to your messagePubHandler blocking (see the readme for recomendations on dealing with this, 99% of the time this issue is raised the problem is in the users code).

huifer commented 2 weeks ago

sample code :

package main

import (
    "fmt"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "github.com/google/uuid"
    "go.uber.org/zap"
    "log"
    "strconv"
    "time"
)

var ccc = make(map[int]mqtt.Client)

func main1() {
    InitLog()

    zap.S().Infof("启动")
    for i := range 100 {
        mqttlient := createMqttClient(i)
        ccc[i] = mqttlient
    }

    select {

    }
}

func createMqttClient(i int) mqtt.Client {
    mqtt.ERROR = log.New(getWriteSync(), "[ERROR] ", 0)
    //mqtt.CRITICAL = log.New(getWriteSync(), "[CRIT] ", 0)
    //mqtt.WARN = log.New(getWriteSync(), "[WARN]  ", 0)
    //mqtt.DEBUG = log.New(getWriteSync(), "[DEBUG] ", 0)
    var broker = "localhost"
    var port = 1883
    opts := mqtt.NewClientOptions()
    opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
    s := uuid.New().String()
    opts.SetClientID(s)
    opts.SetUsername("admin")
    opts.SetPassword("public")
    opts.SetDefaultPublishHandler(messagePubHandler)
    //opts.SetAutoReconnect(true)
    opts.OnConnect = connectHandler
    opts.OnConnectionLost = connectLostHandler
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    subscribe := client.Subscribe("/test_topic/"+strconv.Itoa(i), 0, nil)
    if subscribe.Wait() && subscribe.Error() != nil {
        panic(subscribe.Error())

    }
    return client
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
    zap.S().Infof("Connected")
}
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    reader := client.OptionsReader()
    id := reader.ClientID()
    zap.S().Debugf("id %+v",id)
    time.Sleep(1*time.Second)
    //fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    zap.S().Errorf("Connect lost: %v", err)
}

log


2024-08-27 12:56:53 info    test/b.go:18    启动
2024-08-27 12:56:53 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:54 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:55 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:56 info    test/b.go:58    Connected
2024-08-27 12:56:57 info    test/b.go:58    Connected
2024-08-27 12:56:57 info    test/b.go:58    Connected
2024-08-27 12:56:57 info    test/b.go:58    Connected
2024-08-27 12:56:57 info    test/b.go:58    Connected
2024-08-27 12:56:57 info    test/b.go:58    Connected
2024-08-27 12:56:57 info    test/b.go:58    Connected
2024-08-27 12:56:57 info    test/b.go:58    Connected
2024-08-27 12:56:57 info    test/b.go:58    Connected
2024-08-27 12:56:57 info    test/b.go:58    Connected
2024-08-27 12:56:57 info    test/b.go:58    Connected
2024-08-27 12:56:57 info    test/b.go:58    Connected
2024-08-27 12:56:57 info    test/b.go:58    Connected
2024-08-27 12:56:57 info    test/b.go:58    Connected
2024-08-27 12:56:57 info    test/b.go:58    Connected
2024-08-27 12:56:57 info    test/b.go:58    Connected
2024-08-27 12:56:58 info    test/b.go:58    Connected
2024-08-27 12:56:58 info    test/b.go:58    Connected
2024-08-27 12:56:58 info    test/b.go:58    Connected
2024-08-27 12:56:58 info    test/b.go:58    Connected
2024-08-27 12:56:58 info    test/b.go:58    Connected
2024-08-27 12:56:58 info    test/b.go:58    Connected
2024-08-27 12:56:58 info    test/b.go:58    Connected
2024-08-27 12:56:58 info    test/b.go:58    Connected
2024-08-27 12:56:58 info    test/b.go:58    Connected
2024-08-27 12:56:59 info    test/b.go:58    Connected
2024-08-27 12:56:59 info    test/b.go:58    Connected
2024-08-27 12:56:59 info    test/b.go:58    Connected
2024-08-27 12:56:59 info    test/b.go:58    Connected
2024-08-27 12:58:47 error   test/b.go:68    Connect lost: pingresp not received, disconnecting
2024-08-27 12:58:47 info    test/b.go:58    Connected
2024-08-27 12:58:48 error   test/b.go:68    Connect lost: pingresp not received, disconnecting
2024-08-27 12:58:48 info    test/b.go:58    Connected
2024-08-27 12:58:52 error   test/b.go:68    Connect lost: pingresp not received, disconnecting
2024-08-27 12:58:52 info    test/b.go:58    Connected
2024-08-27 12:59:09 error   test/b.go:68    Connect lost: pingresp not received, disconnecting
2024-08-27 12:59:09 info    test/b.go:58    Connected
2024-08-27 12:59:09 error   test/b.go:68    Connect lost: pingresp not received, disconnecting
2024-08-27 12:59:09 info    test/b.go:58    Connected
MattBrittan commented 2 weeks ago

Copied your code, made fixes so it compiles, ran against Mosquitto (2.0.18), left for 10 minutes, no errors noted and all connections remained up. Looking at the mosquitto logs I could see that ping requests and responses were being received/transmitted as expected. Stopped mosquitto and verified that the client printed errors due to the loss of connection.

Ran the same test using broker.emqx.io to confirm if this is an issue with a generic emqx installation. The connections came up pretty slowly (I assume there is a rate limiter on this server). No errors (or connection lost) messages were logged.

So, based on this the code is running OK - it's possible that the issue is with your broker config (but I can't really tell).

I have stopped using .SetAutoReconnect(true)

Note that this is the default :-) (so you need to call .SetAutoReconnect(false) to disable)

huifer commented 2 weeks ago

code

package main

import (
    "fmt"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "github.com/google/uuid"
    "go.uber.org/zap"
    "log"
    "strconv"
    "time"
)

var ccc = make(map[int]mqtt.Client)

func main1() {
    InitLog()

    zap.S().Infof("启动")
    for i := range 100 {
        mqttlient := createMqttClient(i)
        ccc[i] = mqttlient
    }

    select {
    case msg := <-choke:
        zap.S().Infof(msg[0], msg[1], msg[2])
    }
    select {

    }
}

var choke = make(chan [3]string)

func createMqttClient(i int) mqtt.Client {
    mqtt.ERROR = log.New(getWriteSync(), "[ERROR] ", 0)
    mqtt.CRITICAL = log.New(getWriteSync(), "[CRIT] ", 0)
    //mqtt.WARN = log.New(getWriteSync(), "[WARN]  ", 0)
    //mqtt.DEBUG = log.New(getWriteSync(), "[DEBUG] ", 0)
    var broker = "localhost"
    var port = 1883
    opts := mqtt.NewClientOptions()
    opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
    s := uuid.New().String()
    opts.SetClientID(s)
    opts.SetUsername("admin")
    opts.SetPassword("public")
    //opts.SetDefaultPublishHandler(messagePubHandler)
    opts.SetAutoReconnect(false)

    opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
        reader := client.OptionsReader()
        choke <- [3]string{msg.Topic(), string(msg.Payload()), reader.ClientID()}
    })

    opts.OnConnect = connectHandler
    opts.OnConnectionLost = connectLostHandler
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    subscribe := client.Subscribe("/test_topic/"+strconv.Itoa(i), 0, nil)
    if subscribe.Wait() && subscribe.Error() != nil {
        panic(subscribe.Error())

    }

    return client
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
    zap.S().Infof("Connected")
}
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    reader := client.OptionsReader()
    id := reader.ClientID()
    zap.S().Debugf("id %+v", id)
    time.Sleep(1 * time.Second)
    //fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    zap.S().Errorf("Connect lost: %v", err)
}

mosquitto config

# Config file for mosquitto
#
# See mosquitto.conf(5) for more information.
#
# Default values are shown, uncomment to change.
#
# Use the # character to indicate a comment, but only if it is the
# very first character on the line.

# =================================================================
# General configuration
# =================================================================

# Use per listener security settings.
#
# It is recommended this option be set before any other options.
#
# If this option is set to true, then all authentication and access control
# options are controlled on a per listener basis. The following options are
# affected:
#
# password_file acl_file psk_file auth_plugin auth_opt_* allow_anonymous
# auto_id_prefix allow_zero_length_clientid
#
# Note that if set to true, then a durable client (i.e. with clean session set
# to false) that has disconnected will use the ACL settings defined for the
# listener that it was most recently connected to.
#
# The default behaviour is for this to be set to false, which maintains the
# setting behaviour from previous versions of mosquitto.
#per_listener_settings false

# If a client is subscribed to multiple subscriptions that overlap, e.g. foo/#
# and foo/+/baz , then MQTT expects that when the broker receives a message on
# a topic that matches both subscriptions, such as foo/bar/baz, then the client
# should only receive the message once.
# Mosquitto keeps track of which clients a message has been sent to in order to
# meet this requirement. The allow_duplicate_messages option allows this
# behaviour to be disabled, which may be useful if you have a large number of
# clients subscribed to the same set of topics and are very concerned about
# minimising memory usage.
# It can be safely set to true if you know in advance that your clients will
# never have overlapping subscriptions, otherwise your clients must be able to
# correctly deal with duplicate messages even when then have QoS=2.
#allow_duplicate_messages false

# This option controls whether a client is allowed to connect with a zero
# length client id or not. This option only affects clients using MQTT v3.1.1
# and later. If set to false, clients connecting with a zero length client id
# are disconnected. If set to true, clients will be allocated a client id by
# the broker. This means it is only useful for clients with clean session set
# to true.
#allow_zero_length_clientid true

# If allow_zero_length_clientid is true, this option allows you to set a prefix
# to automatically generated client ids to aid visibility in logs.
# Defaults to 'auto-'
#auto_id_prefix auto-

# This option affects the scenario when a client subscribes to a topic that has
# retained messages. It is possible that the client that published the retained
# message to the topic had access at the time they published, but that access
# has been subsequently removed. If check_retain_source is set to true, the
# default, the source of a retained message will be checked for access rights
# before it is republished. When set to false, no check will be made and the
# retained message will always be published. This affects all listeners.
#check_retain_source true

# QoS 1 and 2 messages will be allowed inflight per client until this limit
# is exceeded.  Defaults to 0. (No maximum)
# See also max_inflight_messages
#max_inflight_bytes 0

# The maximum number of QoS 1 and 2 messages currently inflight per
# client.
# This includes messages that are partway through handshakes and
# those that are being retried. Defaults to 20. Set to 0 for no
# maximum. Setting to 1 will guarantee in-order delivery of QoS 1
# and 2 messages.
#max_inflight_messages 20

# For MQTT v5 clients, it is possible to have the server send a "server
# keepalive" value that will override the keepalive value set by the client.
# This is intended to be used as a mechanism to say that the server will
# disconnect the client earlier than it anticipated, and that the client should
# use the new keepalive value. The max_keepalive option allows you to specify
# that clients may only connect with keepalive less than or equal to this
# value, otherwise they will be sent a server keepalive telling them to use
# max_keepalive. This only applies to MQTT v5 clients. The maximum value
# allowable is 65535. Do not set below 10.
#max_keepalive 65535

# For MQTT v5 clients, it is possible to have the server send a "maximum packet
# size" value that will instruct the client it will not accept MQTT packets
# with size greater than max_packet_size bytes. This applies to the full MQTT
# packet, not just the payload. Setting this option to a positive value will
# set the maximum packet size to that number of bytes. If a client sends a
# packet which is larger than this value, it will be disconnected. This applies
# to all clients regardless of the protocol version they are using, but v3.1.1
# and earlier clients will of course not have received the maximum packet size
# information. Defaults to no limit. Setting below 20 bytes is forbidden
# because it is likely to interfere with ordinary client operation, even with
# very small payloads.
#max_packet_size 0

# QoS 1 and 2 messages above those currently in-flight will be queued per
# client until this limit is exceeded.  Defaults to 0. (No maximum)
# See also max_queued_messages.
# If both max_queued_messages and max_queued_bytes are specified, packets will
# be queued until the first limit is reached.
#max_queued_bytes 0

# The maximum number of QoS 1 and 2 messages to hold in a queue per client
# above those that are currently in-flight.  Defaults to 100. Set
# to 0 for no maximum (not recommended).
# See also queue_qos0_messages.
# See also max_queued_bytes.
#max_queued_messages 100
#
# This option sets the maximum number of heap memory bytes that the broker will
# allocate, and hence sets a hard limit on memory use by the broker.  Memory
# requests that exceed this value will be denied. The effect will vary
# depending on what has been denied. If an incoming message is being processed,
# then the message will be dropped and the publishing client will be
# disconnected. If an outgoing message is being sent, then the individual
# message will be dropped and the receiving client will be disconnected.
# Defaults to no limit.
#memory_limit 0

# This option sets the maximum publish payload size that the broker will allow.
# Received messages that exceed this size will not be accepted by the broker.
# The default value is 0, which means that all valid MQTT messages are
# accepted. MQTT imposes a maximum payload size of 268435455 bytes.
#message_size_limit 0

# This option allows persistent clients (those with clean session set to false)
# to be removed if they do not reconnect within a certain time frame.
#
# This is a non-standard option in MQTT V3.1 but allowed in MQTT v3.1.1.
#
# Badly designed clients may set clean session to false whilst using a randomly
# generated client id. This leads to persistent clients that will never
# reconnect. This option allows these clients to be removed.
#
# The expiration period should be an integer followed by one of h d w m y for
# hour, day, week, month and year respectively. For example
#
# persistent_client_expiration 2m
# persistent_client_expiration 14d
# persistent_client_expiration 1y
#
# The default if not set is to never expire persistent clients.
#persistent_client_expiration

# Write process id to a file. Default is a blank string which means
# a pid file shouldn't be written.
# This should be set to /var/run/mosquitto.pid if mosquitto is
# being run automatically on boot with an init script and
# start-stop-daemon or similar.
#pid_file

# Set to true to queue messages with QoS 0 when a persistent client is
# disconnected. These messages are included in the limit imposed by
# max_queued_messages and max_queued_bytes
# Defaults to false.
# This is a non-standard option for the MQTT v3.1 spec but is allowed in
# v3.1.1.
#queue_qos0_messages false

# Set to false to disable retained message support. If a client publishes a
# message with the retain bit set, it will be disconnected if this is set to
# false.
#retain_available true

# Disable Nagle's algorithm on client sockets. This has the effect of reducing
# latency of individual messages at the potential cost of increasing the number
# of packets being sent.
#set_tcp_nodelay false

# Time in seconds between updates of the $SYS tree.
# Set to 0 to disable the publishing of the $SYS tree.
#sys_interval 10

# The MQTT specification requires that the QoS of a message delivered to a
# subscriber is never upgraded to match the QoS of the subscription. Enabling
# this option changes this behaviour. If upgrade_outgoing_qos is set true,
# messages sent to a subscriber will always match the QoS of its subscription.
# This is a non-standard option explicitly disallowed by the spec.
#upgrade_outgoing_qos false

# When run as root, drop privileges to this user and its primary
# group.
# Set to root to stay as root, but this is not recommended.
# If run as a non-root user, this setting has no effect.
# Note that on Windows this has no effect and so mosquitto should
# be started by the user you wish it to run as.
#user mosquitto

# =================================================================
# Default listener
# =================================================================

# IP address/hostname to bind the default listener to. If not
# given, the default listener will not be bound to a specific
# address and so will be accessible to all network interfaces.
# bind_address ip-address/host name
bind_address 0.0.0.0

# Port to use for the default listener.
#port 1883

# Bind the listener to a specific interface. This is similar to
# bind_address above but is useful when an interface has multiple addresses or
# the address may change. It is valid to use this with the bind_address option,
# but take care that the interface you are binding to contains the address you
# are binding to, otherwise you will not be able to connect.
# Example: bind_interface eth0
#bind_interface

# When a listener is using the websockets protocol, it is possible to serve
# http data as well. Set http_dir to a directory which contains the files you
# wish to serve. If this option is not specified, then no normal http
# connections will be possible.
#http_dir

# The maximum number of client connections to allow. This is
# a per listener setting.
# Default is -1, which means unlimited connections.
# Note that other process limits mean that unlimited connections
# are not really possible. Typically the default maximum number of
# connections possible is around 1024.
#max_connections -1

# Choose the protocol to use when listening.
# This can be either mqtt or websockets.
# Websockets support is currently disabled by default at compile time.
# Certificate based TLS may be used with websockets, except that
# only the cafile, certfile, keyfile and ciphers options are supported.
#protocol mqtt

# Set use_username_as_clientid to true to replace the clientid that a client
# connected with with its username. This allows authentication to be tied to
# the clientid, which means that it is possible to prevent one client
# disconnecting another by using the same clientid.
# If a client connects with no username it will be disconnected as not
# authorised when this option is set to true.
# Do not use in conjunction with clientid_prefixes.
# See also use_identity_as_username.
#use_username_as_clientid

# -----------------------------------------------------------------
# Certificate based SSL/TLS support
# -----------------------------------------------------------------
# The following options can be used to enable SSL/TLS support for
# this listener. Note that the recommended port for MQTT over TLS
# is 8883, but this must be set manually.
#
# See also the mosquitto-tls man page.

# At least one of cafile or capath must be defined. They both
# define methods of accessing the PEM encoded Certificate
# Authority certificates that have signed your server certificate
# and that you wish to trust.
# cafile defines the path to a file containing the CA certificates.
# capath defines a directory that will be searched for files
# containing the CA certificates. For capath to work correctly, the
# certificate files must have ".crt" as the file ending and you must run
# "openssl rehash <path to capath>" each time you add/remove a certificate.
#cafile
#capath

# Path to the PEM encoded server certificate.
#certfile

# Path to the PEM encoded keyfile.
#keyfile

# If you have require_certificate set to true, you can create a certificate
# revocation list file to revoke access to particular client certificates. If
# you have done this, use crlfile to point to the PEM encoded revocation file.
#crlfile

# If you wish to control which encryption ciphers are used, use the ciphers
# option. The list of available ciphers can be obtained using the "openssl
# ciphers" command and should be provided in the same format as the output of
# that command.
# If unset defaults to DEFAULT:!aNULL:!eNULL:!LOW:!EXPORT:!SSLv2:@STRENGTH
#ciphers DEFAULT:!aNULL:!eNULL:!LOW:!EXPORT:!SSLv2:@STRENGTH

# To allow the use of ephemeral DH key exchange, which provides forward
# security, the listener must load DH parameters. This can be specified with
# the dhparamfile option. The dhparamfile can be generated with the command
# e.g. "openssl dhparam -out dhparam.pem 2048"
#dhparamfile

# By default a TLS enabled listener will operate in a similar fashion to a
# https enabled web server, in that the server has a certificate signed by a CA
# and the client will verify that it is a trusted certificate. The overall aim
# is encryption of the network traffic. By setting require_certificate to true,
# the client must provide a valid certificate in order for the network
# connection to proceed. This allows access to the broker to be controlled
# outside of the mechanisms provided by MQTT.
#require_certificate false

# This option defines the version of the TLS protocol to use for this listener.
# The default value allows all of v1.3, v1.2 and v1.1. The valid values are
# tlsv1.3 tlsv1.2 and tlsv1.1.
#tls_version

# If require_certificate is true, you may set use_identity_as_username to true
# to use the CN value from the client certificate as a username. If this is
# true, the password_file option will not be used for this listener.
# This takes priority over use_subject_as_username.
# See also use_subject_as_username.
#use_identity_as_username false

# If require_certificate is true, you may set use_subject_as_username to true
# to use the complete subject value from the client certificate as a username.
# If this is true, the password_file option will not be used for this listener.
# See also use_identity_as_username
#use_subject_as_username false

# -----------------------------------------------------------------
# Pre-shared-key based SSL/TLS support
# -----------------------------------------------------------------
# The following options can be used to enable PSK based SSL/TLS support for
# this listener. Note that the recommended port for MQTT over TLS is 8883, but
# this must be set manually.
#
# See also the mosquitto-tls man page and the "Certificate based SSL/TLS
# support" section. Only one of certificate or PSK encryption support can be
# enabled for any listener.

# The psk_hint option enables pre-shared-key support for this listener and also
# acts as an identifier for this listener. The hint is sent to clients and may
# be used locally to aid authentication. The hint is a free form string that
# doesn't have much meaning in itself, so feel free to be creative.
# If this option is provided, see psk_file to define the pre-shared keys to be
# used or create a security plugin to handle them.
#psk_hint

# When using PSK, the encryption ciphers used will be chosen from the list of
# available PSK ciphers. If you want to control which ciphers are available,
# use the "ciphers" option.  The list of available ciphers can be obtained
# using the "openssl ciphers" command and should be provided in the same format
# as the output of that command.
#ciphers

# Set use_identity_as_username to have the psk identity sent by the client used
# as its username. Authentication will be carried out using the PSK rather than
# the MQTT username/password and so password_file will not be used for this
# listener.
#use_identity_as_username false

# =================================================================
# Extra listeners
# =================================================================

# Listen on a port/ip address combination. By using this variable
# multiple times, mosquitto can listen on more than one port. If
# this variable is used and neither bind_address nor port given,
# then the default listener will not be started.
# The port number to listen on must be given. Optionally, an ip
# address or host name may be supplied as a second argument. In
# this case, mosquitto will attempt to bind the listener to that
# address and so restrict access to the associated network and
# interface. By default, mosquitto will listen on all interfaces.
# Note that for a websockets listener it is not possible to bind to a host
# name.
# listener port-number [ip address/host name]
#listener

# Bind the listener to a specific interface. This is similar to
# the [ip address/host name] part of the listener definition, but is useful
# when an interface has multiple addresses or the address may change. It is
# valid to use this with the [ip address/host name] part of the listener
# definition, but take care that the interface you are binding to contains the
# address you are binding to, otherwise you will not be able to connect.
# Only available on Linux and requires elevated privileges.
#
# Example: bind_interface eth0
#bind_interface

# When a listener is using the websockets protocol, it is possible to serve
# http data as well. Set http_dir to a directory which contains the files you
# wish to serve. If this option is not specified, then no normal http
# connections will be possible.
#http_dir

# The maximum number of client connections to allow. This is
# a per listener setting.
# Default is -1, which means unlimited connections.
# Note that other process limits mean that unlimited connections
# are not really possible. Typically the default maximum number of
# connections possible is around 1024.
#max_connections -1

# The listener can be restricted to operating within a topic hierarchy using
# the mount_point option. This is achieved be prefixing the mount_point string
# to all topics for any clients connected to this listener. This prefixing only
# happens internally to the broker; the client will not see the prefix.
#mount_point

# Choose the protocol to use when listening.
# This can be either mqtt or websockets.
# Certificate based TLS may be used with websockets, except that only the
# cafile, certfile, keyfile and ciphers options are supported.
#protocol mqtt

# Set use_username_as_clientid to true to replace the clientid that a client
# connected with with its username. This allows authentication to be tied to
# the clientid, which means that it is possible to prevent one client
# disconnecting another by using the same clientid.
# If a client connects with no username it will be disconnected as not
# authorised when this option is set to true.
# Do not use in conjunction with clientid_prefixes.
# See also use_identity_as_username.
#use_username_as_clientid

# Change the websockets headers size. This is a global option, it is not
# possible to set per listener. This option sets the size of the buffer used in
# the libwebsockets library when reading HTTP headers. If you are passing large
# header data such as cookies then you may need to increase this value. If left
# unset, or set to 0, then the default of 1024 bytes will be used.
#websockets_headers_size

# -----------------------------------------------------------------
# Certificate based SSL/TLS support
# -----------------------------------------------------------------
# The following options can be used to enable certificate based SSL/TLS support
# for this listener. Note that the recommended port for MQTT over TLS is 8883,
# but this must be set manually.
#
# See also the mosquitto-tls man page and the "Pre-shared-key based SSL/TLS
# support" section. Only one of certificate or PSK encryption support can be
# enabled for any listener.

# At least one of cafile or capath must be defined to enable certificate based
# TLS encryption. They both define methods of accessing the PEM encoded
# Certificate Authority certificates that have signed your server certificate
# and that you wish to trust.
# cafile defines the path to a file containing the CA certificates.
# capath defines a directory that will be searched for files
# containing the CA certificates. For capath to work correctly, the
# certificate files must have ".crt" as the file ending and you must run
# "openssl rehash <path to capath>" each time you add/remove a certificate.
#cafile
#capath

# Path to the PEM encoded server certificate.
#certfile

# Path to the PEM encoded keyfile.
#keyfile

# If you wish to control which encryption ciphers are used, use the ciphers
# option. The list of available ciphers can be optained using the "openssl
# ciphers" command and should be provided in the same format as the output of
# that command.
#ciphers

# If you have require_certificate set to true, you can create a certificate
# revocation list file to revoke access to particular client certificates. If
# you have done this, use crlfile to point to the PEM encoded revocation file.
#crlfile

# To allow the use of ephemeral DH key exchange, which provides forward
# security, the listener must load DH parameters. This can be specified with
# the dhparamfile option. The dhparamfile can be generated with the command
# e.g. "openssl dhparam -out dhparam.pem 2048"
#dhparamfile

# By default an TLS enabled listener will operate in a similar fashion to a
# https enabled web server, in that the server has a certificate signed by a CA
# and the client will verify that it is a trusted certificate. The overall aim
# is encryption of the network traffic. By setting require_certificate to true,
# the client must provide a valid certificate in order for the network
# connection to proceed. This allows access to the broker to be controlled
# outside of the mechanisms provided by MQTT.
#require_certificate false

# If require_certificate is true, you may set use_identity_as_username to true
# to use the CN value from the client certificate as a username. If this is
# true, the password_file option will not be used for this listener.
#use_identity_as_username false

# -----------------------------------------------------------------
# Pre-shared-key based SSL/TLS support
# -----------------------------------------------------------------
# The following options can be used to enable PSK based SSL/TLS support for
# this listener. Note that the recommended port for MQTT over TLS is 8883, but
# this must be set manually.
#
# See also the mosquitto-tls man page and the "Certificate based SSL/TLS
# support" section. Only one of certificate or PSK encryption support can be
# enabled for any listener.

# The psk_hint option enables pre-shared-key support for this listener and also
# acts as an identifier for this listener. The hint is sent to clients and may
# be used locally to aid authentication. The hint is a free form string that
# doesn't have much meaning in itself, so feel free to be creative.
# If this option is provided, see psk_file to define the pre-shared keys to be
# used or create a security plugin to handle them.
#psk_hint

# When using PSK, the encryption ciphers used will be chosen from the list of
# available PSK ciphers. If you want to control which ciphers are available,
# use the "ciphers" option.  The list of available ciphers can be optained
# using the "openssl ciphers" command and should be provided in the same format
# as the output of that command.
#ciphers

# Set use_identity_as_username to have the psk identity sent by the client used
# as its username. Authentication will be carried out using the PSK rather than
# the MQTT username/password and so password_file will not be used for this
# listener.
#use_identity_as_username false

# =================================================================
# Persistence
# =================================================================

# If persistence is enabled, save the in-memory database to disk
# every autosave_interval seconds. If set to 0, the persistence
# database will only be written when mosquitto exits. See also
# autosave_on_changes.
# Note that writing of the persistence database can be forced by
# sending mosquitto a SIGUSR1 signal.
#autosave_interval 1800

# If true, mosquitto will count the number of subscription changes, retained
# messages received and queued messages and if the total exceeds
# autosave_interval then the in-memory database will be saved to disk.
# If false, mosquitto will save the in-memory database to disk by treating
# autosave_interval as a time in seconds.
#autosave_on_changes false

# Save persistent message data to disk (true/false).
# This saves information about all messages, including
# subscriptions, currently in-flight messages and retained
# messages.
# retained_persistence is a synonym for this option.
#persistence false

# The filename to use for the persistent database, not including
# the path.
#persistence_file mosquitto.db

# Location for persistent database. Must include trailing /
# Default is an empty string (current directory).
# Set to e.g. /var/lib/mosquitto/ if running as a proper service on Linux or
# similar.
#persistence_location

# =================================================================
# Logging
# =================================================================

# Places to log to. Use multiple log_dest lines for multiple
# logging destinations.
# Possible destinations are: stdout stderr syslog topic file
#
# stdout and stderr log to the console on the named output.
#
# syslog uses the userspace syslog facility which usually ends up
# in /var/log/messages or similar.
#
# topic logs to the broker topic '$SYS/broker/log/<severity>',
# where severity is one of D, E, W, N, I, M which are debug, error,
# warning, notice, information and message. Message type severity is used by
# the subscribe/unsubscribe log_types and publishes log messages to
# $SYS/broker/log/M/susbcribe or $SYS/broker/log/M/unsubscribe.
#
# The file destination requires an additional parameter which is the file to be
# logged to, e.g. "log_dest file /var/log/mosquitto.log". The file will be
# closed and reopened when the broker receives a HUP signal. Only a single file
# destination may be configured.
#
# Note that if the broker is running as a Windows service it will default to
# "log_dest none" and neither stdout nor stderr logging is available.
# Use "log_dest none" if you wish to disable logging.
#log_dest stderr

# Types of messages to log. Use multiple log_type lines for logging
# multiple types of messages.
# Possible types are: debug, error, warning, notice, information,
# none, subscribe, unsubscribe, websockets, all.
# Note that debug type messages are for decoding the incoming/outgoing
# network packets. They are not logged in "topics".
#log_type error
#log_type warning
#log_type notice
#log_type information

# If set to true, client connection and disconnection messages will be included
# in the log.
#connection_messages true

# If using syslog logging (not on Windows), messages will be logged to the
# "daemon" facility by default. Use the log_facility option to choose which of
# local0 to local7 to log to instead. The option value should be an integer
# value, e.g. "log_facility 5" to use local5.
#log_facility

# If set to true, add a timestamp value to each log message.
#log_timestamp true

# Set the format of the log timestamp. If left unset, this is the number of
# seconds since the Unix epoch.
# This is a free text string which will be passed to the strftime function. To
# get an ISO 8601 datetime, for example:
# log_timestamp_format %Y-%m-%dT%H:%M:%S
#log_timestamp_format

# Change the websockets logging level. This is a global option, it is not
# possible to set per listener. This is an integer that is interpreted by
# libwebsockets as a bit mask for its lws_log_levels enum. See the
# libwebsockets documentation for more details. "log_type websockets" must also
# be enabled.
#websockets_log_level 0

# =================================================================
# Security
# =================================================================

# If set, only clients that have a matching prefix on their
# clientid will be allowed to connect to the broker. By default,
# all clients may connect.
# For example, setting "secure-" here would mean a client "secure-
# client" could connect but another with clientid "mqtt" couldn't.
#clientid_prefixes

# Boolean value that determines whether clients that connect
# without providing a username are allowed to connect. If set to
# false then a password file should be created (see the
# password_file option) to control authenticated client access.
#
# Defaults to true if no other security options are set. If `password_file` or
# `psk_file` is set, or if an authentication plugin is loaded which implements
# username/password or TLS-PSK checks, then `allow_anonymous` defaults to
# false.
#
#allow_anonymous true

# -----------------------------------------------------------------
# Default authentication and topic access control
# -----------------------------------------------------------------

# Control access to the broker using a password file. This file can be
# generated using the mosquitto_passwd utility. If TLS support is not compiled
# into mosquitto (it is recommended that TLS support should be included) then
# plain text passwords are used, in which case the file should be a text file
# with lines in the format:
# username:password
# The password (and colon) may be omitted if desired, although this
# offers very little in the way of security.
#
# See the TLS client require_certificate and use_identity_as_username options
# for alternative authentication options. If an auth_plugin is used as well as
# password_file, the auth_plugin check will be made first.
#password_file

# Access may also be controlled using a pre-shared-key file. This requires
# TLS-PSK support and a listener configured to use it. The file should be text
# lines in the format:
# identity:key
# The key should be in hexadecimal format without a leading "0x".
# If an auth_plugin is used as well, the auth_plugin check will be made first.
#psk_file

# Control access to topics on the broker using an access control list
# file. If this parameter is defined then only the topics listed will
# have access.
# If the first character of a line of the ACL file is a # it is treated as a
# comment.
# Topic access is added with lines of the format:
#
# topic [read|write|readwrite] <topic>
#
# The access type is controlled using "read", "write" or "readwrite". This
# parameter is optional (unless <topic> contains a space character) - if not
# given then the access is read/write.  <topic> can contain the + or #
# wildcards as in subscriptions.
#
# The first set of topics are applied to anonymous clients, assuming
# allow_anonymous is true. User specific topic ACLs are added after a
# user line as follows:
#
# user <username>
#
# The username referred to here is the same as in password_file. It is
# not the clientid.
#
#
# If is also possible to define ACLs based on pattern substitution within the
# topic. The patterns available for substition are:
#
# %c to match the client id of the client
# %u to match the username of the client
#
# The substitution pattern must be the only text for that level of hierarchy.
#
# The form is the same as for the topic keyword, but using pattern as the
# keyword.
# Pattern ACLs apply to all users even if the "user" keyword has previously
# been given.
#
# If using bridges with usernames and ACLs, connection messages can be allowed
# with the following pattern:
# pattern write $SYS/broker/connection/%c/state
#
# pattern [read|write|readwrite] <topic>
#
# Example:
#
# pattern write sensor/%u/data
#
# If an auth_plugin is used as well as acl_file, the auth_plugin check will be
# made first.
#acl_file

# -----------------------------------------------------------------
# External authentication and topic access plugin options
# -----------------------------------------------------------------

# External authentication and access control can be supported with the
# auth_plugin option. This is a path to a loadable plugin. See also the
# auth_opt_* options described below.
#
# The auth_plugin option can be specified multiple times to load multiple
# plugins. The plugins will be processed in the order that they are specified
# here. If the auth_plugin option is specified alongside either of
# password_file or acl_file then the plugin checks will be made first.
#
#auth_plugin

# If the auth_plugin option above is used, define options to pass to the
# plugin here as described by the plugin instructions. All options named
# using the format auth_opt_* will be passed to the plugin, for example:
#
# auth_opt_db_host
# auth_opt_db_port
# auth_opt_db_username
# auth_opt_db_password

# =================================================================
# Bridges
# =================================================================

# A bridge is a way of connecting multiple MQTT brokers together.
# Create a new bridge using the "connection" option as described below. Set
# options for the bridges using the remaining parameters. You must specify the
# address and at least one topic to subscribe to.
#
# Each connection must have a unique name.
#
# The address line may have multiple host address and ports specified. See
# below in the round_robin description for more details on bridge behaviour if
# multiple addresses are used. Note that if you use an IPv6 address, then you
# are required to specify a port.
#
# The direction that the topic will be shared can be chosen by
# specifying out, in or both, where the default value is out.
# The QoS level of the bridged communication can be specified with the next
# topic option. The default QoS level is 0, to change the QoS the topic
# direction must also be given.
#
# The local and remote prefix options allow a topic to be remapped when it is
# bridged to/from the remote broker. This provides the ability to place a topic
# tree in an appropriate location.
#
# For more details see the mosquitto.conf man page.
#
# Multiple topics can be specified per connection, but be careful
# not to create any loops.
#
# If you are using bridges with cleansession set to false (the default), then
# you may get unexpected behaviour from incoming topics if you change what
# topics you are subscribing to. This is because the remote broker keeps the
# subscription for the old topic. If you have this problem, connect your bridge
# with cleansession set to true, then reconnect with cleansession set to false
# as normal.
#connection <name>
#address <host>[:<port>] [<host>[:<port>]]
#topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix]

# If a bridge has topics that have "out" direction, the default behaviour is to
# send an unsubscribe request to the remote broker on that topic. This means
# that changing a topic direction from "in" to "out" will not keep receiving
# incoming messages. Sending these unsubscribe requests is not always
# desirable, setting bridge_attempt_unsubscribe to false will disable sending
# the unsubscribe request.
#bridge_attempt_unsubscribe true

# Set the version of the MQTT protocol to use with for this bridge. Can be one
# of mqttv311 or mqttv11. Defaults to mqttv311.
#bridge_protocol_version mqttv311

# Set the clean session variable for this bridge.
# When set to true, when the bridge disconnects for any reason, all
# messages and subscriptions will be cleaned up on the remote
# broker. Note that with cleansession set to true, there may be a
# significant amount of retained messages sent when the bridge
# reconnects after losing its connection.
# When set to false, the subscriptions and messages are kept on the
# remote broker, and delivered when the bridge reconnects.
#cleansession false

# Set the amount of time a bridge using the lazy start type must be idle before
# it will be stopped. Defaults to 60 seconds.
#idle_timeout 60

# Set the keepalive interval for this bridge connection, in
# seconds.
#keepalive_interval 60

# Set the clientid to use on the local broker. If not defined, this defaults to
# 'local.<clientid>'. If you are bridging a broker to itself, it is important
# that local_clientid and clientid do not match.
#local_clientid

# If set to true, publish notification messages to the local and remote brokers
# giving information about the state of the bridge connection. Retained
# messages are published to the topic $SYS/broker/connection/<clientid>/state
# unless the notification_topic option is used.
# If the message is 1 then the connection is active, or 0 if the connection has
# failed.
# This uses the last will and testament feature.
#notifications true

# Choose the topic on which notification messages for this bridge are
# published. If not set, messages are published on the topic
# $SYS/broker/connection/<clientid>/state
#notification_topic

# Set the client id to use on the remote end of this bridge connection. If not
# defined, this defaults to 'name.hostname' where name is the connection name
# and hostname is the hostname of this computer.
# This replaces the old "clientid" option to avoid confusion. "clientid"
# remains valid for the time being.
#remote_clientid

# Set the password to use when connecting to a broker that requires
# authentication. This option is only used if remote_username is also set.
# This replaces the old "password" option to avoid confusion. "password"
# remains valid for the time being.
#remote_password

# Set the username to use when connecting to a broker that requires
# authentication.
# This replaces the old "username" option to avoid confusion. "username"
# remains valid for the time being.
#remote_username

# Set the amount of time a bridge using the automatic start type will wait
# until attempting to reconnect.
# This option can be configured to use a constant delay time in seconds, or to
# use a backoff mechanism based on "Decorrelated Jitter", which adds a degree
# of randomness to when the restart occurs.
#
# Set a constant timeout of 20 seconds:
# restart_timeout 20
#
# Set backoff with a base (start value) of 10 seconds and a cap (upper limit) of
# 60 seconds:
# restart_timeout 10 30
#
# Defaults to jitter with a base of 5 and cap of 30
#restart_timeout 5 30

# If the bridge has more than one address given in the address/addresses
# configuration, the round_robin option defines the behaviour of the bridge on
# a failure of the bridge connection. If round_robin is false, the default
# value, then the first address is treated as the main bridge connection. If
# the connection fails, the other secondary addresses will be attempted in
# turn. Whilst connected to a secondary bridge, the bridge will periodically
# attempt to reconnect to the main bridge until successful.
# If round_robin is true, then all addresses are treated as equals. If a
# connection fails, the next address will be tried and if successful will
# remain connected until it fails
#round_robin false

# Set the start type of the bridge. This controls how the bridge starts and
# can be one of three types: automatic, lazy and once. Note that RSMB provides
# a fourth start type "manual" which isn't currently supported by mosquitto.
#
# "automatic" is the default start type and means that the bridge connection
# will be started automatically when the broker starts and also restarted
# after a short delay (30 seconds) if the connection fails.
#
# Bridges using the "lazy" start type will be started automatically when the
# number of queued messages exceeds the number set with the "threshold"
# parameter. It will be stopped automatically after the time set by the
# "idle_timeout" parameter. Use this start type if you wish the connection to
# only be active when it is needed.
#
# A bridge using the "once" start type will be started automatically when the
# broker starts but will not be restarted if the connection fails.
#start_type automatic

# Set the number of messages that need to be queued for a bridge with lazy
# start type to be restarted. Defaults to 10 messages.
# Must be less than max_queued_messages.
#threshold 10

# If try_private is set to true, the bridge will attempt to indicate to the
# remote broker that it is a bridge not an ordinary client. If successful, this
# means that loop detection will be more effective and that retained messages
# will be propagated correctly. Not all brokers support this feature so it may
# be necessary to set try_private to false if your bridge does not connect
# properly.
#try_private true

# -----------------------------------------------------------------
# Certificate based SSL/TLS support
# -----------------------------------------------------------------
# Either bridge_cafile or bridge_capath must be defined to enable TLS support
# for this bridge.
# bridge_cafile defines the path to a file containing the
# Certificate Authority certificates that have signed the remote broker
# certificate.
# bridge_capath defines a directory that will be searched for files containing
# the CA certificates. For bridge_capath to work correctly, the certificate
# files must have ".crt" as the file ending and you must run "openssl rehash
# <path to capath>" each time you add/remove a certificate.
#bridge_cafile
#bridge_capath

# If the remote broker has more than one protocol available on its port, e.g.
# MQTT and WebSockets, then use bridge_alpn to configure which protocol is
# requested. Note that WebSockets support for bridges is not yet available.
#bridge_alpn

# When using certificate based encryption, bridge_insecure disables
# verification of the server hostname in the server certificate. This can be
# useful when testing initial server configurations, but makes it possible for
# a malicious third party to impersonate your server through DNS spoofing, for
# example. Use this option in testing only. If you need to resort to using this
# option in a production environment, your setup is at fault and there is no
# point using encryption.
#bridge_insecure false

# Path to the PEM encoded client certificate, if required by the remote broker.
#bridge_certfile

# Path to the PEM encoded client private key, if required by the remote broker.
#bridge_keyfile

# -----------------------------------------------------------------
# PSK based SSL/TLS support
# -----------------------------------------------------------------
# Pre-shared-key encryption provides an alternative to certificate based
# encryption. A bridge can be configured to use PSK with the bridge_identity
# and bridge_psk options. These are the client PSK identity, and pre-shared-key
# in hexadecimal format with no "0x". Only one of certificate and PSK based
# encryption can be used on one
# bridge at once.
#bridge_identity
#bridge_psk

# =================================================================
# External config files
# =================================================================

# External configuration files may be included by using the
# include_dir option. This defines a directory that will be searched
# for config files. All files that end in '.conf' will be loaded as
# a configuration file. It is best to have this as the last option
# in the main file. This option will only be processed from the main
# configuration file. The directory specified must not contain the
# main configuration file.
# Files within include_dir will be loaded sorted in case-sensitive
# alphabetical order, with capital letters ordered first. If this option is
# given multiple times, all of the files from the first instance will be
# processed before the next instance. See the man page for examples.
#include_dir

log

[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
[CRIT] [pinger]   pingresp not received, disconnecting
^C
huifer commented 2 weeks ago

Attention: I have a program that keeps sending data


import (
    "bufio"
    "encoding/json"
    "fmt"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "go.uber.org/zap"
    "log"
    "math/rand"
    "os"
    "strconv"
    "strings"
    "time"
)

func main2() {
    InitLog()
    mqtt.ERROR = log.New(getWriteSync(), "[ERROR] ", 0)
    mqtt.CRITICAL = log.New(getWriteSync(), "[CRIT] ", 0)
    mqtt.WARN = log.New(getWriteSync(), "[WARN]  ", 0)
    mqtt.DEBUG = log.New(getWriteSync(), "[DEBUG] ", 0)
    var broker = "localhost"
    var port = 1883
    opts := mqtt.NewClientOptions()
    opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
    opts.SetClientID("go_mqtt_client2")
    opts.SetUsername("admin")
    opts.SetPassword("public")
    opts.SetDefaultPublishHandler(messagePubHandler)
    opts.SetAutoReconnect(true)
    opts.OnConnect = connectHandler
    opts.OnConnectionLost = connectLostHandler
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    fime := readFime("1.txt")

    for {

        for _, vc := range fime {
            publish(client, vc.Topic, vc.ID, vc.ID)

        }
        time.Sleep(1*time.Second)
    }

}
func c(client mqtt.Client, vc Vc) {

    ticker := time.NewTicker(1 * time.Second)
    for range ticker.C {
        publish(client, vc.Topic, vc.ID, vc.ID)
    }

}

func readFime(path string) []Vc {
    // 读取 path 每一行用空格分割分隔的数据 第一个是 Topic 第二个是ID

    file, err := os.Open(path)
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    var vcs []Vc
    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        line := scanner.Text()
        fields := strings.Split(line, "\t")
        if len(fields) != 2 {
            log.Printf("Invalid line format: %s", line)
            continue
        }
        topic := fields[0]
        id, err := strconv.Atoi(fields[1])
        if err != nil {
            log.Printf("Invalid ID: %s", fields[1])
            continue
        }
        vcs = append(vcs, Vc{Topic: topic, ID: id})
    }
    if err := scanner.Err(); err != nil {
        log.Fatal(err)
    }
    return vcs
}

type Vc struct {
    Topic string
    ID    int
}

func publish(client mqtt.Client, topic string, i int, i2 int) {
    // 初始化随机数生成器的种子
    rand.Seed(time.Now().UnixNano())

    // 生成随机数

    var dataRows []DataRow
    for i3 := range 200 {
        randomNum := rand.Intn(21) // Intn返回一个[0, n)范围内的随机数
        dataRows = append(dataRows, DataRow{
            Name:  "信号-" + strconv.Itoa(i3),
            Value: strconv.Itoa(randomNum),
        })
    }

    //
    DataRowList := DataRowList{
        Time:               time.Now().Unix(),
        DeviceUid:          strconv.Itoa(i2),
        IdentificationCode: strconv.Itoa(i2),
        DataRows:           dataRows,
        Nc:                 strconv.Itoa(i2),
    }

    marshal, _ := json.Marshal(DataRowList)

    zap.S().Infof("发送消息: %s  消息主题: %s\n", DataRowList.Time, topic)
    token := client.Publish(topic, 0, false, marshal)

    if token.Wait() && token.Error() != nil {
        zap.S().Error(token.Error())
    }

}

type DataRowList struct {
    Time               int64     `json:"Time"`               // 秒级时间戳
    DeviceUid          string    `json:"DeviceUid"`          // 能够产生网络通讯的唯一编码
    IdentificationCode string    `json:"IdentificationCode"` // 设备标识码
    DataRows           []DataRow `json:"DataRows"`
    Nc                 string    `json:"Nc"`
}
type DataRow struct {
    Name  string `json:"Name"`
    Value string `json:"Value"`
}
MattBrittan commented 2 weeks ago

I'm happy to try and resolve bugs, but do not have time to deal with things like this that are already covered in the documentation, which states:

A MessageHandler (called when a new message is received) must not block (unless ClientOptions.SetOrderMatters(false) set). If you wish to perform a long-running task, or publish a message, then please use a go routine (blocking in the handler is a common cause of unexpected pingresp not received, disconnecting errors).

You are not calling ClientOptions.SetOrderMatters(false) and your DefaultPublishHandler will block after the first message is received; so what you are seeing is what the docs say you should expect (and what I have warned you about above).

If you have questions like this please ask them on stack overflow (but expect issues without sufficient detail to be closed quickly!). Please accept my apologies but, whilst I try to deal with bugs in the library, I'm unable to provide general support. If you find what you believe is a bug then please test it carefully (and review the documentation) before raising it as an issue. Remember that the library has quite a few users, and is pretty well tested, so it's best to assume the issue is in your code and test that throughly before raising an issue here.

huifer commented 2 weeks ago

I modified the program but still have problems

package main

import (
    "context"
    "encoding/json"
    "fmt"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "go.uber.org/zap"
    "time"
)

// MqttInterface 定义了MQTT客户端的基本接口
type MqttInterface struct {
    client  mqtt.Client
    Id      string
    SubChan chan MQTTMessage
    Config  MqttConfig
}

// NewMqttClient 初始化并返回一个新的MqttInterface实例
func NewMqttClient(id string, config MqttConfig) *MqttInterface {
    return &MqttInterface{
        Id:      id,
        SubChan: make(chan MQTTMessage),
        Config:  config,
    }
}

// Connect 连接到MQTT服务器
func (m *MqttInterface) Connect(host, username, password string, port int) error {
    opts := mqtt.NewClientOptions()
    opts.AddBroker(fmt.Sprintf("tcp://%s:%d", host, port))
    opts.SetUsername(username)
    opts.SetAutoReconnect(false)
    opts.SetPassword(password)
    opts.SetClientID(m.Id)
    opts.SetOrderMatters(false)
    opts.SetDefaultPublishHandler(m.messageHandler)
    opts.OnConnectionLost = func(client mqtt.Client, err error) {
        zap.S().Errorf("mqtt connection lost id = %s , error = %+v", m.Id, err)
        client.Disconnect(250)
        globalRedisClient.HDel(context.Background(), "mqtt_config:use", m.Id)
        globalRedisClient.SRem(context.Background(), "node_bind:"+globalConfig.NodeInfo.Name, 0, m.Id)
        marshal, _ := json.Marshal(m.Config)
        AddNoUseConfig(m.Config, marshal)

    }
    opts.SetAutoReconnect(false)
    // 创建并启动客户端
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        return token.Error()
    }

    m.client = client
    return nil
}

// messageHandler 处理接收到的消息
func (m *MqttInterface) messageHandler(client mqtt.Client, msg mqtt.Message) {

    mqttMsg := MQTTMessage{
        MQTTClientID: m.Id,
        Message:      string(msg.Payload()),
    }
    m.SubChan <- mqttMsg

}

// Subscribe 订阅一个或多个主题
func (m *MqttInterface) Subscribe(topics string) {
    if token := m.client.Subscribe(topics, 0, nil); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
    }
}

// Publish 向一个主题发布消息
func (m *MqttInterface) Publish(topic string, payload interface{}) {
    token := m.client.Publish(topic, 0, false, payload)
    token.Wait()
}

// Disconnect 断开与MQTT服务器的连接
func (m *MqttInterface) Disconnect() {
    m.client.Disconnect(250)
}

// ConsumeMessages 消费SubChan中的消息
func (m *MqttInterface) ConsumeMessages() {
    for {
        select {
        case msg := <-m.SubChan:
            jsonData, err := json.Marshal(msg)
            if err != nil {
                zap.S().Errorf("Error marshalling MQTT message to JSON: %v", err)
            }
            PushToQueue("pre_handler", jsonData)
        case <-time.After(10 * time.Second):
            // 如果10秒内没有消息,打印一条超时消息
            fmt.Println("No messages received within the last 10 seconds.")
        }
    }
}
MattBrittan commented 2 weeks ago

"but still have problems" - sorry, without full details of the issues (and the logs) there is no way to help. My guess would bve that m.SubChan <- mqttMsg is blocking but I don't even know what ewrror you are getting.

You keep changing your code and, most likely, introducing different issues (I've pointed out a number of seperate issues). Unfortunately I cannot spend more time on this, the issues section is intended for bugs and not general support. As such, I'm going to close this issue (please see the readme for some support options, however you will need to take the time to provide full details (i.e. a minimial, reproducable example) each time you ask.

huifer commented 2 weeks ago

OH NO

package main

import (
    "fmt"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "go.uber.org/zap"
    "sync"
    "time"
)

// MqttInterface 定义了MQTT客户端的基本接口
type MqttInterface struct {
    client mqtt.Client
    Id     string
    Chan   chan []byte
    Config MqttConfig
    wg     sync.WaitGroup
}

// NewMqttClient 初始化并返回一个新的MqttInterface实例
func NewMqttClient(id string, config MqttConfig) *MqttInterface {
    return &MqttInterface{
        Id:     id,
        Chan:   make(chan []byte, 1000),
        Config: config,
    }
}

// Connect 连接到MQTT服务器
func (m *MqttInterface) Connect(host, username, password string, port int) error {
    opts := mqtt.NewClientOptions()
    opts.AddBroker(fmt.Sprintf("tcp://%s:%d", host, port))
    opts.SetUsername(username)
    opts.SetAutoReconnect(false)
    opts.SetPassword(password)
    opts.SetClientID(m.Id)
    //opts.SetDefaultPublishHandler(m.messageHandler)
    opts.OnConnectionLost = func(client mqtt.Client, err error) {
        zap.S().Errorf("mqtt connection lost id = %s , error = %+v", m.Id, err)
        StopMqttClient(m.Id, m.Config)
    }

    opts.SetOrderMatters(false)
    opts.SetKeepAlive(60 * time.Second)
    // 创建并启动客户端
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        return token.Error()
    }

    m.client = client
    return nil
}

// messageHandler 处理接收到的消息
func (m *MqttInterface) messageHandler(client mqtt.Client, msg mqtt.Message) {

}

// Subscribe 订阅一个或多个主题
func (m *MqttInterface) Subscribe(topics string) error {
    var token = m.client.Subscribe(topics, 0, nil)

    if token.Wait() && token.Error() != nil {
        zap.S().Errorf(token.Error().Error())
        return token.Error()
    }
    return nil
}

// Publish 向一个主题发布消息
func (m *MqttInterface) Publish(topic string, payload interface{}) {
    token := m.client.Publish(topic, 0, false, payload)
    token.Wait()
}

// Disconnect 断开与MQTT服务器的连接
func (m *MqttInterface) Disconnect() {
    m.client.Disconnect(250)
}

func (m *MqttInterface) HandlerMsg() {
    for {
        c := <-m.Chan
        PushToQueue("pre_handler", c)

    }
}

This problem occurred even though there was no message processing done inside

2024-08-29 08:09:05 error   go-iot/mqtt_service.go:39   mqtt connection lost id = TT_26 , error = pingresp not received, disconnecting
2024-08-29 08:09:14 error   go-iot/mqtt_service.go:39   mqtt connection lost id = TT_6 , error = pingresp not received, disconnecting
2024-08-29 08:09:15 error   go-iot/mqtt_service.go:39   mqtt connection lost id = TT_38 , error = pingresp not received, disconnecting
2024-08-29 08:10:11 error   go-iot/mqtt_service.go:39   mqtt connection lost id = TT_97 , error = pingresp not received, disconnecting
2024-08-29 08:10:14 error   go-iot/mqtt_service.go:39   mqtt connection lost id = TT_61 , error = pingresp not received, disconnecting
2024-08-29 08:10:15 error   go-iot/mqtt_service.go:39   mqtt connection lost id = TT_49 , error = pingresp not received, disconnecting