VolantMQ / volantmq

High-Performance MQTT Server
Apache License 2.0
982 stars 169 forks source link

Memory usage is too high #111

Closed wurei closed 6 years ago

wurei commented 6 years ago

Hi troian: I found memory usage is too high, memory growth is very much when loop count i, the test case as follow:

i = 1, memory maybe used 150M.
i = 3, memory used 3G+.
package main

import (
    "fmt"
    "os"
    "time"
    "github.com/eclipse/paho.mqtt.golang"
    "strconv"
    "log"
)

var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    fmt.Printf("---- RECV=>TOPIC: %s, ID: %d, Qos: %d ----\n", msg.Topic(), msg.Payload(), msg.Qos())
}

var onConnect mqtt.OnConnectHandler = func(client mqtt.Client) {
    options := client.OptionsReader()
    fmt.Printf("%s connect\n", options.ClientID())
}

var onDisconnect mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    options := client.OptionsReader()
    fmt.Printf("%s disconnect\n", options.ClientID())
}

func mqttClient(host, id string) mqtt.Client {
    opts := mqtt.NewClientOptions().AddBroker("tcp://" + host).SetClientID(id)
    opts.SetKeepAlive(60 * time.Second)
    opts.SetDefaultPublishHandler(onMessage)
    opts.SetOnConnectHandler(onConnect)
    opts.SetConnectionLostHandler(onDisconnect)

    c := mqtt.NewClient(opts)
    if token := c.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    return c
}

func main() {
    var err error
    //mqtt.DEBUG = log.New(os.Stdout, "", 0)
    mqtt.ERROR = log.New(os.Stdout, "", 0)

    if len(os.Args) < 2 {
        fmt.Println("Usage:" + os.Args[0] + " host:port count")
                fmt.Println("e.g:" + os.Args[0] + " 127.0.0.1:1883 2")
        return
    }

    host := os.Args[1]
    count := 1
    count, err = strconv.Atoi(os.Args[2])
    if err != nil {
        fmt.Println("default num:1")
        count = 1
    }

    for i := 0; i < count; i++ {
        var clients []mqtt.Client
        for x := 0; x < 1000; x++ {
            c := mqttClient(host, fmt.Sprintf("%04d", x))
            clients = append(clients, c)
        }
        for _, c := range clients {
            c.Disconnect(1)
        }
    }

    fmt.Println("Press any key to exit.")
    var input string
    fmt.Scanln(&input)
}
wurei commented 6 years ago

I found newWriter make chan is very large, is it can be optimized?

w := &writer{
        gMessages:     make(chan mqttp.IFace, 0xFFFF),
        qos0Messages:  make(chan interface{}, 0xFFFF),
        qos12Messages: make(chan interface{}, 0xFFFF),
    }
troian commented 6 years ago

Yeah, that needs rework. Thinking

troian commented 6 years ago

Have push test variant to check

wurei commented 6 years ago

I test 500k connection and memory usage 10G, it‘s much better than before, good work!👍

troian commented 6 years ago

Thanks for testing. v0.2.2 released