dotnet / MQTTnet

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
MIT License
4.45k stars 1.06k forks source link

Store pending messages on broker's side #1208

Open elektromntr opened 3 years ago

elektromntr commented 3 years ago

Describe the feature request

I need to store messages pending in server's queues, that are not yet delivered to subscribers. And when server crashes, I would like to restore these queues to be sure that every message reaches each subscriber.

Which project is your feature request related to?

Describe the solution you'd like

I need to store messages pending in server's queues, that are not yet delivered to subscribers. And when server crashes, I would like to restore these queues to be sure that every message reaches each subscriber. For example, when client (subscriber) is disconnected for some time, its queue on server side is populated with new pending messages. And when server crashes at this point, subscriber will lose those messages. Even when the server restarts and subscriber's reconnect will take place, messages are lost.

Additional context

Place of storing should be decided on my own. It may be file, database or whatever.

@saba-sabrin I think we just store the messages in memory, so if the broker is restarted you will still loose messages.

@chkr1011 should we extend IMqttServerStorage or provide a similar interface like IMqttSessionStorage to store pending messages of a session?

Originally posted by @JanEggers in https://github.com/chkr1011/MQTTnet/issues/498#issuecomment-448552957

chkr1011 commented 3 years ago

This feature is still in progress but unfortunately it is hard to implement. Especially when performance should by high and feature set rich. The problem is when should the messages being stored? Every time a message is produced? Then the broker will get slow when a lot of traffic is happening. But when storing the messages on a clean exit of the server you will lose all messages on an app crash etc.

tcbarton commented 2 years ago

I wanted to comment this is an important feature. I understand the performance implications. Other brokers offer session persistence on stop/start scenarios...I'm not sure how they work on crash protection. Since there will be a performance impact for a crash protection setup, then it could be designed as an optional feature to turn on/off so it could be implemented by the user depending on their use case. Right now I'm using mqtt for a platform that won't have extremely high throughput, but session persistence is very important.

Thanks for your work on this project.

logicaloud commented 2 years ago

I'd like to look into implementing the interfaces required to have an extensible solution similar to what is done with the retained message manager. There are already interfaces IMqttServerPersistedSessionsStorage and IMqttServerPersistedSession that seem to aim at handling persisted sessions but so far they don't seem to used. @chkr1011, any pointers regarding intent or direction would be appreciated. I'm also happy to add any additional interfaces and hook them up to server options and server code, unless there is already work in progress by anyone?

chkr1011 commented 2 years ago

The classes you mentioned are indeed intended for storing this data. For now the biggest design question is when the storage should be invoked? For every message? With all messages in a batch etc.? Performance is a big issue here. If we store the messages as soon as a new message is published the broker will get very slow. So the interfaces should give some flexibility to the user like access to all data in a atomic operation and also handlers for a single pending message. Then users can also decide to save things ONLY when the broker is shutting down properly (losing data at a crash might be acceptable). But I am also in the middle of a big rewrite now so you might have a lot of merge issues when starting with this right now. You can have a look at the branch _Reason_Code_In_SubscriptionInterceptor.

But for starters you can focus on "filling" the session upon creation with the existing data. When this is working we can focus on saving the data at a proper time.

Recommendation: Try Test Driven Development so that we have enough Unit Tests in the first place.

logicaloud commented 2 years ago

@chkr1011, thank you. Yes, the interfaces/extension points must give the implementer the opportunity to follow the MQTT specification to the letter. That means interface methods for storage must be called for every message. The difficult design decisions are then for the implementer to consider, for example, whether messages are stored to the underlying storage every time, or whether they are buffered for a while. A start/stop method for storage on shutdown would also be helpful.

I'd like to press ahead with this even if it means more merge work later. It sounds like the work on _Reason_Code_In_SubscriptionInterceptor will take a while? I'll check how a set of interfaces and call points would hang together in master and (hopefully) have some basis for discussion in a week or so.

logicaloud commented 2 years ago

There are quite a few touch points in code where the "Persistent Session Manager" interface needs to be called but it all looks good so far. Because I have been working on the interface only (not the storage behind the interface), I think I'll open another issue or pull request "at the right time". I suspect that a good time for a pull request will be when the _Reason_Code_In_SubscriptionInterceptor branch is merged into master.

mark-trumed commented 1 year ago

@logicaloud This is a feature that I would very much like to utilize. What is the current status?

logicaloud commented 1 year ago

@mark-trumed The MqttPersistedSessionManager is currently implemented in fork https://github.com/logicaloud/MQTTnet/tree/persisted-sessions-v4 but somewhat tangled up with some other added MQTT5 features like session and message expiry and improvements for retained message handling. My thinking was to untangle the features then create pull requests separately, closer to the end of the year when I have more time. I'm not sure how this would fit in with current ongoing developments. @chkr1011, any comments are welcome.

Yekt commented 1 year ago

@logicaloud @chkr1011 Persistence is mandatory for any application where data integrity is nonnegotiable (no matter the performance impact). Is there any update on the progress?

Since I need this feature I have to try to implement a crude solution myself. My plan was to persist both the subscriptions and the unfinished messages and resubsribe all clients on startup and resend all messages after that. Do I also need to persist the Sessions? And if so, how can I get the sessions and re-inject them on startup? Furthermore, how do I know if a message was delivered to all clients and is dequeued by the server? My understanding is that the server stores all messages (with QoS > 0) in some collection and removes them from the collection once all subscribed clients received the message. Is there an event for that? I don't think ClientAcknowledgedPublishPacketAsync is what I'm looking for, however I could be wrong since there is no documentation.

logicaloud commented 1 year ago

I'm a bit in limbo with this at the moment and I think the ball is in @chkr1011's court. There is a discussion going on over here: https://github.com/dotnet/MQTTnet/discussions/1764.

As mentioned there, the persistent messages and persistent sessions integration should probably use interfaces instead of events. Once that is agreed on, then I would like to update the branch referred to in the discussion to change from an event based approach to using interfaces, then tidy up, review, and create some pull requests. The end result would provide suitable extension points within the MQTTnet library to hook up storage providers like Redis or SQL databases (to be implemented in a separate library). The integrated default implementation would use in-memory retained messages and persistent sessions, so the built-in features would be limited but directly usable.

I'm not sure about whether there is a notification about all messages being delivered; if not then that maybe that is something that could be considered going forward.