eclipse-paho / paho.mqtt.golang

Other
2.79k stars 534 forks source link

Memory keep growing when send message on qos=1 #617

Closed lowmanisbusy closed 2 years ago

lowmanisbusy commented 2 years ago

1、make a new payload everytime 2、use qos=1 The memory would be keep growing before I stop the program,but if I send message on qos=0,it would be always right.

MattBrittan commented 2 years ago

Please see the reporting bugs section of the readme for the info you need to include when raising an issue.

Your issue does not contain enough information to allow any investigation. Please provide sufficient detail to enable us to understand/replicate what you are seeing:

lowmanisbusy commented 2 years ago
1、golang version: 1.16.5
2、paho.mqtt.golang version: v1.4.1
3、OS info: Ubuntu 18.04.6 LTS,cpu 4 cores,memory:8g
4、my code:

package main

import (
    "encoding/json"
    "fmt"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "log"
    "os"
    "strings"
    "time"
)

var messagePublishHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    fmt.Printf("TOPIC: %s\n", msg.Topic())
    fmt.Printf("MSG: %s\n", msg.Payload())
}

var messageSubscribeHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    fmt.Printf("Sub Client Topic : %s \n", msg.Topic())
    fmt.Printf("Sub Client msg : %s \n", msg.Payload())
}

var onConnectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
    fmt.Println("Connected")
}

var ConnectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    fmt.Printf("Connect lost: %v\n", err)
}

func init() {
    mqtt.DEBUG = log.New(os.Stdout, "       [mqttDEBUG]", 0)
    mqtt.ERROR = log.New(os.Stdout, "   [mqttERROR]", 0)
}

func Push() {
    EMQServerAddress := "127.0.0.1"
    EMQServerPort := "1883"
    opts := mqtt.NewClientOptions().AddBroker("tcp://" + EMQServerAddress + ":" + EMQServerPort)
    opts.SetKeepAlive(60 * time.Second)
    opts.SetDefaultPublishHandler(messagePublishHandler)
    opts.SetPingTimeout(1 * time.Second)
    opts.OnConnect = onConnectHandler
    opts.OnConnectionLost = ConnectLostHandler
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    for i:=1; i< 1000000; i++ {
        data := map[string]interface{}{
            "ts": 0,
            "values": map[string]interface{}{
                "temperature": 42,
                "humidity": 80,
                "data": strings.Repeat("1", 1024*250),
            },
        }
        telemetry := make(map[string][1]interface{}, 1)

        data["ts"] = time.Now().UnixNano() / 1e6
        telemetry["device_id_0070_0000000000048"] = [1]interface{}{data}
        payloads, _ := json.Marshal(telemetry)
        topic := fmt.Sprintf("test")
        token := client.Publish(topic, 1, false, payloads)
        token.wait()
        fmt.Printf("done, %d \r\n", time.Now().UnixNano())
    }
    client.Disconnect(250)
}

func main() {
    Push()
}
lowmanisbusy commented 2 years ago

I am confused,if my code is runing in the wrong way,why every thing is ok when send message on qos=0

MattBrittan commented 2 years ago

You have not really explained what the problem is ("memory would be keep growing" does not really tell me much). When using QOS=1 the library needs to store the message (in memory by default) so that if the connection is lost before the broker acknowledges it the message can be resent. The memory used should be freed by the garbage collector at some point (how this gets released to the OS gets complicated!). I've run your app against Mosquitto (10,000 iterations and fixed token.wait()) and could not see anything wrong. The highest allocator was time.NewTimer (each time Publish is called a timer is started; we probably should stop these but they will not use a huge amount of memory - after 10,000 iterations this was sitting at around 500kb and would not really grow from there as timers would be expiring at the same rate as they are created). Without more info on the problem you face I'm not really going to be able to help.

lowmanisbusy commented 2 years ago

" When using QOS=1 the library needs to store the message (in memory by default)"

haha,forgive me,I get it,because When using QOS=1 the library needs to store the message,so it make me in mistake for my app leaking memory.

lowmanisbusy commented 2 years ago

thank you

MattBrittan commented 2 years ago

Note that while the message does need to be stored, the memory should be released after the ack is received (in your code you wait for the ack before sending another message). This means that memory usage should not grow significantly over time (but that does not mean memory gets released to the OS immediately).

lowmanisbusy commented 2 years ago

your are right. my mqtt broker (thingboards) do something in the wrong way.

lowmanisbusy commented 2 years ago

So interesting,if i use emqx as my broker,it would be ok,but if I connect to thingboards and send message to it (qos=1),the app would does not release the memory,I am sure the app was received the puback.

extra paho.mqtt.python 1.6.1 is ok,whether connect to thindboards or emqx. I ask so many people from my workmate,they also had found this problem in paho.mqtt.golang,maybe here is something wrong.

MattBrittan commented 2 years ago

Can you post logs/detail? As you are waiting on the token you are definitely getting the ack. I cannot think of any reason using a different broker would make a difference (unless the connection drops frequently).

lowmanisbusy commented 2 years ago

Hi, The codes as what I send to you,it is runing ok, no error, no warning, I use token.Wait() to make sure get the puback, the subscriber alse receive the message, every things just look like ok(connect to emqx and send message to it),but if I send message to Thingboards, the memory would be keep growing all the time util I stop the app(whether windows or linux),maybe because the mqtt protocol version unmatch between ThingBoards and paho.mqtt.golang? I and not sure, it's just a speculation.

My teams plan to use golang to replace python to rebuild our program, but we get in trouble for this.

My english just not so good,forgive me.

Thanks Matt

MattBrittan commented 2 years ago

If you can provide me with a things-board account privately (paho@brittan.nz - ideally supply the exact code you ran) then I'm happy to test this (I cannot currently see any issues that would cause memory growth - and am still not completely sure what that really means because there are lots of ways to measure memory loss).

lowmanisbusy commented 2 years ago

Sorry, Matt, the things-board what I use is belong to my boss,so I can’t provide it to you, I am sorry to say that, but I use another golang mqtt client package to write my code(send message to things-board, qos=1),the app would be run perfectly, I don't know why, maybe you can find the different between libmqtt and paho.mqtt.golang, Is there any possibility that paho.mqtt.golang handle the puback not yet perfected? because if I use the same payload to publish to things-board, paho.mqtt.golang would be ok.

The path of the package: https://github.com/goiiot/libmqtt by the way, this package is no longer maintained.

Thanks Matt

lowmanisbusy commented 1 year ago

If you can provide me with a things-board account privately (paho@brittan.nz - ideally supply the exact code you ran) then I'm happy to test this (I cannot currently see any issues that would cause memory growth - and am still not completely sure what that really means because there are lots of ways to measure memory loss).

Hey,MattBrittan

we found it,my workmate found the point,the ThingsBoard always return a ack inconformity with mqtt protocol(some package will compatibility),it will be make the client never delete the message in the queue which wait the puback.

Thank you