eclipse / paho.mqtt.golang

Other
2.73k stars 533 forks source link

Stored messages require multiple reboots to fully forward #681

Open jismithc opened 1 month ago

jismithc commented 1 month ago

Hi! We are hoping someone has seen this issue before or can quickly point out something silly we are doing.

We are currently supporting a container utilizing paho.mqtt to forward multiple messages with various cadences, at max every 2 seconds.

We are using the store functionality during network outages to store up to 16 minutes of messages and forward the stored messages after reconnection. After 16 minutes through a network outage, a watchdog will reboot our system. After the system reconnects to the network, we would expect to see all stored messages are forwarded. Instead, we see only some of the stored messages forwarded. We don't see the remaining messages until after a container restart.

What we are seeing is the following:

- viewing the directory where the messages are stored, we can see the missing data
    - `ls /path/to/store/ |wc -l` returns 459
- reboot the container
- upon reconnection, we see complete outage data from minute 0 to minute 14 backfilled into our db, partial data from minute 14 to minute 16 followed by similar logs to above
- viewing the directory where the messages are stored, we can see there is still partial data
    - `ls /path/to/store/ |wc -l` returns 90

- reboot the container
- upon reconnection, we see complete outage data from minute 0 to minute 16 backfilled into our db
- viewing the directory where the messages are stored, we can see there is no more stored messages

Here is how we are initializing our client,
opts := mqtt.NewClientOptions()
opts.AddBroker(broker)
opts.SetClientID(clientID)
opts.SetTLSConfig(tlsConfig)
opts.SetCleanSession(false)

// #1196 - Enable store and forward for QoS 1 and 2 messages.
opts.SetStore(mqtt.NewFileStore(storeForwardPath))

// Set timeout to 30 seconds (arbitrary number). Default is no timeout.
opts.SetWriteTimeout(time.Duration(30) * time.Second)

    // Set handlers to log client messages
opts.SetDefaultPublishHandler(DefaultPublishHandler)
opts.SetOnConnectHandler(OnConnectHandler)
opts.SetConnectionLostHandler(ConnectionLostHandler)
opts.SetReconnectingHandler(ReconnectingHandler)
opts.SetConnectionAttemptHandler(ConnectionAttemptHandler)

// Set up the logger instances for each mqtt log level.
mqtt.ERROR = newMqttLogger(*slog.Default(), slog.LevelError)
mqtt.CRITICAL = newMqttLogger(*slog.Default(), slog.LevelError) // Treat as error
mqtt.WARN = newMqttLogger(*slog.Default(), slog.LevelWarn)

client := mqtt.NewClient(opts)

Upon publish, we are calling the client.publish() as so, 
if token := client.Publish('/my/mqtt/topic', 1, false, payload); token.Error() != nil {
    return token.Error()
}

Note: we are not using `token.Wait()` upon return of the token. With a QOS of 1 during a network outage, we expect the `token.Wait()` will never be fulfilled. Are we correct in assuming this is what we should be doing? 

Another clue that may or may not be relevant - if we disable our watchdog so it does not reboot after 16 minutes, then the system reconnects to the network, any new (live) messages AND stored messages will NOT be published until a container reboot.

Please let us know if this is an issue seen before. We're hoping there is an obvious flaw in our process that the community can spot quickly. Thanks!
MattBrittan commented 1 month ago

Interesting - I have seen this before but always put it down to OS buffering (and message loss on a power cycle was not a big issue with my use case). The filestore writes these files out in two steps:

  1. Writes to tempfile (e.g. o.2656.TMP)
  2. Renames the temp file

I believe that this was done to avoid the situation where the file is partially written, and then power lost, but suspect it does not achieve that goal fully . I wonder if we need to add a Sync.

I've moved all of my apps over to the V5 client and have not seen any corrupt files since doing that. Checked the V5 code and it does call Sync (addded that because I noticed that the timestamps were sometimes not ordered as expected). As such I'd say thats worth a try - would you be open to testing the change (just add f.Sync() before the Close here).

With a QOS of 1 during a network outage, we expect the token.Wait() will never be fulfilled.

The wait would be fuilfilled when the connection drops (no right answer but leaving calls hanging was not really an option (the V5 client handles this better).

Another clue that may or may not be relevant - if we disable our watchdog so it does not reboot after 16 minutes, then the system reconnects to the network, any new (live) messages AND stored messages will NOT be published until a container reboot.

Thats not what I would expect to see (reconnect calls resume which should sent the messages). Would you be open to enabling logging so we can see what is happening there?

Note: Whilst I'm happy to help I don't actually use this library any more so won't put as much time into tracing issues as I would have previously (so the more logs etc you provide the more likely it is that I'll do something). Am still happy to review/merge PR's!

jismithc commented 1 month ago

Hi @MattBrittan sorry for the late reply / thank you for your quick reply.

After reviewing your message we attempted to move our codebase to the v5, to utilize the queue functionality to store and forward messages during network outages. On initial research it seems the migration to the new client solves all of our issues above.

For posterity, I did try the following without success. I believe I saw the same behavior as before with this change.

would you be open to testing the change (just add f.Sync() before the Close here).

Time is in short supply for me, but if I do have time, I could explore adding logs and debugging this issue with you. For now, I will finalize our migration to paho.golang utilizing the mqtt v5 protocol as you have done.

Thank you again!

MattBrittan commented 1 month ago

No worries - I've done a lot of testing with the V5 client (simulating network drops etc) so am prerry confident that it will not loose messages (QOS2 is another matter - it's supported but the implementation could use some work). I will leave this issue open for a while in case anyone else encounters the same problem and is able to perform some testring (unfortunately these issues often require a specific setup and can be hard to replicate).