soypat / natiu-mqtt

A dead-simple, extensible MQTT implementation well suited for embedded systems.
MIT License
84 stars 4 forks source link

Client HandleNext Behavior #11

Open charles-d-burton opened 6 days ago

charles-d-burton commented 6 days ago

I ran into a bug with your client.HandleNext() function. When I setup a connection like you have in your examples after about a minute client.HandleNext() starts blasting my terminal with EOF errors. According to the function docs if it has an error it should disconnect and go into a reconnect, but that's not the case. The code in question is:

    go func() {
        for {
            if !client.IsConnected() {
                time.Sleep(time.Second)
                tryconnect()
                continue
            }

            err := client.HandleNext()
            if err != nil {
                slog.Error(err.Error())
            }
        }
    }()

After about a minute it hits that if err != nil block and just spins out with EOF

2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF

Comments here indicate that it should fail and try to reconnect

https://github.com/soypat/natiu-mqtt/blob/dc6618a0526531c36907ea4a763f2d85ad4face9/client.go#L56

Here's the full code for reference:

func startMQTT(send chan bool, received chan []byte, topics ...string) {

    client := mqtt.NewClient(mqtt.ClientConfig{
        Decoder: mqtt.DecoderNoAlloc{UserBuffer: make([]byte, 1500)},
        OnPub: func(_ mqtt.Header, _ mqtt.VariablesPublish, r io.Reader) error {
            message, _ := io.ReadAll(r)
            if len(message) > 0 {
                select {
                case received <- message:
                default:
                    //Ignores message if buffer full
                }
            }
            println("received message")
            return nil
        },
    })

    var varConn mqtt.VariablesConnect
    varConn.SetDefaultMQTT([]byte("meetmon"))
    varConn.Username = []byte("<redacted>")
    varConn.Password = []byte("<redacted>")

    tryconnect := func() error {
        conn, err := net.Dial("tcp", "192.168.1.121:1883")
        if err != nil {
            return err
        }
        ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
        defer cancel()
        return client.Connect(ctx, conn, &varConn)
    }

    err := tryconnect()
    if err != nil {
        slog.Error("connect attempt failed")
        panic(err)
    }
    slog.Info("connected to mqtt broker")

    go func() {
        for {
            if !client.IsConnected() {
                time.Sleep(time.Second)
                tryconnect()
                continue
            }

            err := client.HandleNext()
            if err != nil {
                slog.Error(err.Error())
            }
        }
    }()

    slog.Info("setting config state in broker")
    config, err := (&MeetingConfig{}).GetConfig()
    if err != nil {
        panic(err)
    }
    fmt.Println(string(config))

    configPflags, err := mqtt.NewPublishFlags(mqtt.QoS0, false, true)
    if err != nil {
        panic(err)
    }

    vpub := mqtt.VariablesPublish{
        TopicName: []byte("homeassistant/binary_sensor/meeting/config"),
    }

    vpub.PacketIdentifier = randInt16()
    slog.Info("publishing config")
    err = client.PublishPayload(configPflags, vpub, config)
    if err != nil {
        panic(err)
    }

    go func() {
        pflags, err := mqtt.NewPublishFlags(mqtt.QoS0, false, false)
        if err != nil {
            panic(err)
        }
        vpub.TopicName = []byte("homeassistant/binary_sensor/meeting/state")

        for {
            if !client.IsConnected() {
                slog.Info("client is disconnected, sleeping for 1 second and retrying")
                time.Sleep(time.Second)
                continue
            }

            //TODO: Maybe this should be a select
            msg := <-send
            for {
                if msg {
                    slog.Info("setting meeting state to ON")
                    vpub.PacketIdentifier = randInt16()
                    err := client.PublishPayload(pflags, vpub, []byte("ON"))
                    if err == nil {
                        break //retry until message is sent
                    }
                } else {
          //Default to the not in a meeting state
          slog.Info("setting meeting state to OFF")
          err := client.PublishPayload(pflags, vpub, []byte("OFF"))
          if err == nil {
            break
          }
        }
        time.Sleep(time.Second)
            }
        }
    }()
    ctx := context.Background()

    var vsub mqtt.VariablesSubscribe
    vsub.TopicFilters = []mqtt.SubscribeRequest{mqtt.SubscribeRequest{TopicFilter: []byte("homeassistant/binary_sensor/meeting/state"), QoS: mqtt.QoS0}}
    vsub.PacketIdentifier = randInt16()
    err = client.Subscribe(ctx, vsub)
    if err != nil {
        println(err.Error())
        panic(err)
    }
}
charles-d-burton commented 6 days ago

Maybe this behavior is expected? If I add another goroutine that has a ticker to ping the server i stop receiving error messages. I also added a client.Disconnect() if the handler throws an error. That seems to handle the problem of the disconnect not showing up in the system. All of this is an easy enough fix.

soypat commented 6 days ago

io.EOF and net.ErrClosed are special errors. We should always disconnect on receiving them since there is no more data to be expected after receiving them. Changes in #12. Feel free to test :)