mochi-mqtt / server

The fully compliant, embeddable high-performance Go MQTT v5 server for IoT, smarthome, and pubsub
MIT License
1.29k stars 222 forks source link

Rate limiting with OnPublish #435

Closed viktorvoltaire closed 1 month ago

viktorvoltaire commented 1 month ago

Hello,

I wanted to implement a rate limiting function with the OnPublish hook but i cant really get it working. When running the following hook my subscribers get 0 messages delivered.

If I just change my if statement to return pk, nil all messages gets delivered to the subscribers.

type ExampleHook struct {
    mqtt.HookBase
    config *ExampleHookOptions
    cache  map[string]int64
    mu     *sync.Mutex
}

func (h *ExampleHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
    now := time.Now().UTC().UnixMilli()
    h.mu.Lock()
    defer h.mu.Unlock()

    // Check if the client's last publish was within 900 milliseconds
    if v, ok := h.cache[cl.ID]; ok {
        if now-v < 900 {
            return pk, packets.ErrRejectPacket
        }
    }

    h.cache[cl.ID] = now
    return pk, nil
}

It feels like theres something within the broker that gets triggered in my use case, should I be using another hook?