mochi-mqtt / server

The fully compliant, embeddable high-performance Go MQTT v5 server for IoT, smarthome, and pubsub
MIT License
1.29k stars 222 forks source link

Persistence storage did not work with SetCleanSession(false) #397

Closed vec715 closed 6 months ago

vec715 commented 6 months ago

i got strange errors and cannot debug them:

time=2024-05-06T06:17:06.534+03:00 level=INFO msg="added hook" hook=debug
time=2024-05-06T06:17:06.535+03:00 level=INFO msg="added hook" hook=allow-all-auth
time=2024-05-06T06:17:06.535+03:00 level=INFO msg="added hook" hook=redis-db
time=2024-05-06T06:17:06.535+03:00 level=INFO msg="connecting to redis service" hook=redis-db prefix=mochi- address=localhost:6379 username="" password-len=0 db=0
time=2024-05-06T06:17:06.552+03:00 level=INFO msg="connected to redis service" hook=redis-db
time=2024-05-06T06:17:06.553+03:00 level=INFO msg="attached listener" id=t1 protocol=tcp address=[::]:1883
time=2024-05-06T06:17:06.553+03:00 level=INFO msg="mochi mqtt starting" version=2.6.3
time=2024-05-06T06:17:06.557+03:00 level=INFO msg="mochi mqtt server started"
time=2024-05-06T06:17:16.958+03:00 level=WARN msg="" listener=t1 error="unspecified error"
time=2024-05-06T06:17:16.959+03:00 level=WARN msg="" listener=t1 error="unspecified error"
2
time=2024-05-06T06:17:16.960+03:00 level=WARN msg="" listener=t1 error="unspecified error"
time=2024-05-06T06:17:56.981+03:00 level=WARN msg="" listener=t1 error="unspecified error"
time=2024-05-06T06:17:56.982+03:00 level=WARN msg="" listener=t1 error="unspecified error"
time=2024-05-06T06:18:37.012+03:00 level=WARN msg="" listener=t1 error="unspecified error"
time=2024-05-06T06:18:37.013+03:00 level=WARN msg="" listener=t1 error="unspecified error"

My code is pretty simple:

Server part:

server := mqtt.New(nil)

    err := server.AddHook(new(debug.Hook), &debug.Options{
        Enable: true, ShowPings: true,
        // ShowPacketData: true,
    })
    if err != nil {
        log.Fatal(err)
    }

    _ = server.AddHook(new(allowAll.AllowHook), nil)

    err = server.AddHook(new(redis.Hook), &redis.Options{
        Options: &rv8.Options{
            Addr:     "localhost:6379", // default redis address
            Password: "",               // your password
            DB:       0,                // your redis db
        },
    })
    if err != nil {
        return nil, fmt.Errorf("failed to add auth allow all hook: %w", err)
    }

    // Create a TCP listener on a standard port.
    tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: fmt.Sprintf(":%d", port)})
    err = server.AddListener(tcp)
    if err != nil {
        return nil, fmt.Errorf("failed to add listener: %w", err)
    }

Client (paho):

opts := mqtt.NewClientOptions().
        AddBroker("tcp://localhost:1883").
        SetProtocolVersion(5).
        SetUsername("018f4ba7-2599-7aa2-9b77-5025e261f450").
        // SetUsername(token).
        SetCleanSession(false)
    client := mqtt.NewClient(opts)

    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    subToken := client.Subscribe("test00", 0, func(client mqtt.Client, msg mqtt.Message) {
        log.Printf("Received message: %s\n", msg.Payload())

        var pushMessage PushMessage
        if err := json.Unmarshal(msg.Payload(), &pushMessage); err != nil {
            log.Printf("Failed to unmarshal message: %v", err)
            return
        }

        log.Printf("Received message of type %s", pushMessage.Type)

        // Acknowledge the message
        msg.Ack()
    })
    if subToken.Error() != nil {
        panic(subToken.Error())
    }

Once i connect using this client I immediately see these strange errors. If I will remove the line SetCleanSession(false) everything works fine. This feature is crucial for me and didnt work :( any solutions? Thanks

vec715 commented 6 months ago

actually, I got the same issues even when disabling redis hook. I assume this is not related to redis

werbenhu commented 6 months ago

@vec715 Why do you need this msg.Ack()? Try removing it and see what happens

vec715 commented 6 months ago

I think I finally fixed it! It's because I didn't specify a sessionID, only a username. If CleanSession is disabled but there is no SessionID, the broker returns these strange errors