mochi-mqtt / server

The fully compliant, embeddable high-performance Go MQTT v5 server for IoT, smarthome, and pubsub
MIT License
1.29k stars 222 forks source link

Replace badgerhold with directly using BadgerDB v4 #376

Closed werbenhu closed 7 months ago

werbenhu commented 8 months ago

Refer to #375 .

coveralls commented 8 months ago

Pull Request Test Coverage Report for Build 8466241881

Details


Changes Missing Coverage Covered Lines Changed/Added Lines %
hooks/storage/badger/badger.go 108 112 96.43%
<!-- Total: 108 112 96.43% -->
Totals Coverage Status
Change from base Build 8465554775: 0.1%
Covered Lines: 6104
Relevant Lines: 6178

💛 - Coveralls
werbenhu commented 8 months ago

@mochi-co Not ready to merge yet.

werbenhu commented 8 months ago

I believe there are no changes needed on my end. Please review and merge. I came across some information about Badger GC here: https://github.com/dgraph-io/badger/issues/767. Perhaps there's no issue with the GC.

werbenhu commented 7 months ago

@thedevop If considering serialization, I don't think gob would be the best choice. Please refer to: https://github.com/alecthomas/go_serialization_benchmarks. Personally, I believe sticking to the standard library would suffice. When it comes to choosing a serialization library, different users may have different preferences, much like when we chose a logging library before.

thedevop commented 7 months ago

Understand. I was just mentioning as we have binary data, and json is not efficient as it use base64 encoding. I normally would not recommend gob (as it is only for Go) for any network based service, but the use case here maybe appropriate as it is just to store/load what was in-memory for the app. BTW, gob is part of standard Go library.

werbenhu commented 7 months ago

I've noticed that badgerhold is using gob. If we decide to use gob, then simply updating all UnmarshalBinary and MarshalBinary under the storage package to use gob would suffice. @thedevop I think it might be better for you to submit another PR for this change.

thedevop commented 7 months ago

I can do that, let's hear from @mochi-co first.

werbenhu commented 7 months ago

@thedevop @mochi-co I've written a benchmark test case. You can refer to it and check if there's something wrong. Just modify it and run to see the results. My results are as follows:

go version go1.22.1 windows/amd64
cpu: AMD Ryzen 5 5600H with Radeon Graphics
Benchmark_Gob_Marshal-12          149014              7534 ns/op               902.0 B/serial       4640 B/op         85 allocs/op
Benchmark_Gob_Unmarshal-12         43809             27506 ns/op               902.0 B/serial      14558 B/op        367 allocs/op
Benchmark_Json_Marshal-12        1000000              1029 ns/op               844.0 B/serial        896 B/op          1 allocs/op
Benchmark_Json_Unmarshal-12       169522              6622 ns/op               844.0 B/serial        824 B/op          9 allocs/op
package serialization

import (
    "bytes"
    "encoding/gob"
    "encoding/json"
    "fmt"
    "math"
    "math/rand"
    "testing"
)

// UserProperty is an arbitrary key-value pair for a packet user properties array.
type UserProperty struct { // [MQTT-1.5.7-1]
    Key string `json:"k"`
    Val string `json:"v"`
}

// Client is a storable representation of an MQTT client.
type Client struct {
    Will            ClientWill       `json:"will"`            // will topic and payload data if applicable
    Properties      ClientProperties `json:"properties"`      // the connect properties for the client
    Username        []byte           `json:"username"`        // the username of the client
    ID              string           `json:"id" storm:"id"`   // the client id / storage key
    T               string           `json:"t"`               // the data type (client)
    Remote          string           `json:"remote"`          // the remote address of the client
    Listener        string           `json:"listener"`        // the listener the client connected on
    ProtocolVersion byte             `json:"protocolVersion"` // mqtt protocol version of the client
    Clean           bool             `json:"clean"`           // if the client requested a clean start/session
}

// ClientProperties contains a limited set of the mqtt v5 properties specific to a client connection.
type ClientProperties struct {
    AuthenticationData        []byte         `json:"authenticationData"`
    User                      []UserProperty `json:"user"`
    AuthenticationMethod      string         `json:"authenticationMethod"`
    SessionExpiryInterval     uint32         `json:"sessionExpiryInterval"`
    MaximumPacketSize         uint32         `json:"maximumPacketSize"`
    ReceiveMaximum            uint16         `json:"receiveMaximum"`
    TopicAliasMaximum         uint16         `json:"topicAliasMaximum"`
    SessionExpiryIntervalFlag bool           `json:"sessionExpiryIntervalFlag"`
    RequestProblemInfo        byte           `json:"requestProblemInfo"`
    RequestProblemInfoFlag    bool           `json:"requestProblemInfoFlag"`
    RequestResponseInfo       byte           `json:"requestResponseInfo"`
}

// ClientWill contains a will message for a client, and limited mqtt v5 properties.
type ClientWill struct {
    Payload           []byte         `json:"payload"`
    User              []UserProperty `json:"user"`
    TopicName         string         `json:"topicName"`
    Flag              uint32         `json:"flag"`
    WillDelayInterval uint32         `json:"willDelayInterval"`
    Qos               byte           `json:"qos"`
    Retain            bool           `json:"retain"`
}

type Serializer interface {
    Marshal(o interface{}) ([]byte, error)
    Unmarshal(d []byte, o interface{}) error
}

func randString(l int) string {
    buf := make([]byte, l)
    for i := 0; i < (l+1)/2; i++ {
        buf[i] = byte(rand.Intn(256))
    }
    return fmt.Sprintf("%x", buf)[:l]
}

func generate() []*Client {
    a := make([]*Client, 0, 1000)
    for i := 0; i < 1000; i++ {
        a = append(a, &Client{
            ID: randString(16),
            Properties: ClientProperties{
                MaximumPacketSize: math.MaxUint32,
            },
            Will: ClientWill{
                Payload: []byte(randString(256)),
            },
        })
    }
    return a
}

func benchMarshal(b *testing.B, s Serializer) {
    b.Helper()
    data := generate()
    b.ReportAllocs()
    b.ResetTimer()
    var serialSize int
    for i := 0; i < b.N; i++ {
        o := data[rand.Intn(len(data))]
        bytes, err := s.Marshal(o)
        if err != nil {
            b.Fatalf("marshal error %s for %#v", err, o)
        }
        serialSize += len(bytes)
    }
    b.ReportMetric(float64(serialSize)/float64(b.N), "B/serial")
}

func benchUnmarshal(b *testing.B, s Serializer) {
    b.Helper()
    b.StopTimer()
    data := generate()
    ser := make([][]byte, len(data))
    var serialSize int
    for i, d := range data {
        o, err := s.Marshal(d)
        if err != nil {
            b.Fatal(err)
        }
        t := make([]byte, len(o))
        serialSize += copy(t, o)
        ser[i] = t
    }
    b.ReportMetric(float64(serialSize)/float64(len(data)), "B/serial")
    b.ReportAllocs()
    b.StartTimer()

    for i := 0; i < b.N; i++ {
        n := rand.Intn(len(ser))
        o := &Client{}
        err := s.Unmarshal(ser[n], o)
        if err != nil {
            b.Fatalf("unmarshal error %s for %#x / %q", err, ser[n], ser[n])
        }
        // Validate unmarshalled data.
        i := data[n]
        correct := o.ID == i.ID
        if !correct {
            b.Fatalf("unmarshaled object differed:\n%v\n%v", i, o)
        }
    }
}

type GobSerializer struct{}

func (g *GobSerializer) Marshal(o interface{}) ([]byte, error) {
    var buf bytes.Buffer
    err := gob.NewEncoder(&buf).Encode(o)
    return buf.Bytes(), err
}

func (g *GobSerializer) Unmarshal(d []byte, o interface{}) error {
    return gob.NewDecoder(bytes.NewReader(d)).Decode(o)
}

func NewGobSerializer() *GobSerializer {
    // registration required before first use
    gob.Register(Client{})
    return &GobSerializer{}
}

func Benchmark_Gob_Marshal(b *testing.B) {
    s := NewGobSerializer()
    benchMarshal(b, s)
}

func Benchmark_Gob_Unmarshal(b *testing.B) {
    s := NewGobSerializer()
    benchUnmarshal(b, s)
}

type JsonSerializer struct{}

func (j JsonSerializer) Marshal(o interface{}) ([]byte, error) {
    return json.Marshal(o)
}

func (j JsonSerializer) Unmarshal(d []byte, o interface{}) error {
    return json.Unmarshal(d, o)
}

func Benchmark_Json_Marshal(b *testing.B) {
    benchMarshal(b, JsonSerializer{})
}

func Benchmark_Json_Unmarshal(b *testing.B) {
    benchUnmarshal(b, JsonSerializer{})
}
mochi-co commented 7 months ago

@thedevop @werbenhu gob changes sound like a nice improvement, agree it's best as a new PR.

@werbenhu This one looks good to me, so I'm merging it now 👍🏻

werbenhu commented 7 months ago

@mochi-co @thedevop What I mean is, from my test results, it seems that Gob doesn't offer better performance than JSON. I'm not sure if I'm using Gob correctly in my test cases. I hope you can also take a look at my code and run the tests to confirm the results.