mochi-mqtt / server

The fully compliant, embeddable high-performance Go MQTT v5 server for IoT, smarthome, and pubsub
MIT License
1.29k stars 222 forks source link

Fix for unlimited maximum message expiry interval #315

Closed dadebue closed 1 year ago

dadebue commented 1 year ago

fixes #306

mochi-co commented 1 year ago

I will try to look at this today, apologies for the delay

coveralls commented 1 year ago

Pull Request Test Coverage Report for Build 6541851124


Totals Coverage Status
Change from base Build 6536886378: 0.004%
Covered Lines: 5455
Relevant Lines: 5515

💛 - Coveralls
dadebue commented 1 year ago

I still think that we shouldn't disable the clearing of expired retained messages if s.Options.Capabilities.MaximumMessageExpi``ryInterval == math.MaxInt64

What would be the result if a sender sends a retained message that has a expiry of 1 hour but s.Options.Capabilities.MaximumMessageExpiryInterval == math.MaxInt64? The message will not be deleted from the broker because clearing of retained messages is disabled and therefore will not expire in 1 hour as the sender intended...

What do you think?

werbenhu commented 1 year ago

@dadebue Actually, according to the MQTT spec, retained messages do not provide the capability for client to set an expiration time. MQTT retained messages remains on the broker until they are replaced by a new retained message or deleted when a new non-retained message is published.

dadebue commented 1 year ago

@dadebue Actually, according to the MQTT spec, retained messages do not provide the capability for client to set an expiration time. MQTT retained messages remains on the broker until they are replaced by a new retained message or deleted when a new non-retained message is published.

OK I didn't know that. If that's the case, why do we even check for a expiry:

if (pk.Expiry > 0 && pk.Expiry < now) || ... {}
werbenhu commented 1 year ago

I'm not entirely sure here; perhaps this is also a point worth discussing. @mochi-co @dgduncan @thedevop , does 'pk.Expiry' have the same effect on retained messages? Should it be changed to something like this?

// clearExpiredRetainedMessage deletes retained messages from topics if they have expired.
func (s *Server) clearExpiredRetainedMessages(now int64) {
    // If the maximum message expiry interval is set to math.MaxInt64, do not process expired messages.
    if s.Options.Capabilities.MaximumMessageExpiryInterval != math.MaxInt64 {
        for filter, pk := range s.Topics.Retained.GetAll() {
            if pk.Created < now-s.Options.Capabilities.MaximumMessageExpiryInterval {
                s.Topics.Retained.Delete(filter)
                s.hooks.OnRetainedExpired(filter)
            }
        }
    }
}
dgduncan commented 1 year ago

@werbenhu @dadebue I can not find where it is precisely stated; however, according to 3.3.1.3 RETAIN

If the current retained message for a Topic expires, it is discarded and there will be no retained message for that topic.

And according to the HiveMQ MQTT 5 Essentials

Furthermore, if the ‘retained=true’ option is selected during the PUBLISH message, the interval also dictates the length of time a message is retained on a particular topic.

dadebue commented 1 year ago

And even more in the mentioned HiveMQ essentials article:

For retained messages, the message expiry interval operates similarly, guaranteeing that these messages are only dispatched to new subscribers for a specified period.

Sounds to me as retained messages can have an expiry interval just as regular messages...

With the current code this guarantee is not valid!

mochi-co commented 1 year ago

And even more in the mentioned HiveMQ essentials article:

For retained messages, the message expiry interval operates similarly, guaranteeing that these messages are only dispatched to new subscribers for a specified period.

Sounds to me as retained messages can have an expiry interval just as regular messages...

I'm planning to properly re-read through and address all of these comments (and others) as soon as I can, but the above is exactly correct, as far as I remember. At least for MQTTv5, the client may specify how long the message may persist. As MQTTv3 doesn't have client-set message expiries, we fall back to the server capabilities instead (to avoid retaining all messages forever by default).

I should note that while the spec doesn't explicitly say this applies to retained messages, logic would suggest this is the case, and it's how every other MQTTv5 broker has been configured as far as I was able to determine. On a subsequent re-read, the spec explicitly applies the expiry to all messages (including retained and will).

werbenhu commented 1 year ago

If, according to the spec, there isn't a very clear indication of whether the message expiration time is effective for retained messages, perhaps the broker can make a non-normative decision on its own about this.

Personally, I lean towards the idea that message expiration should be effective for retained messages as well.

Keeping retained messages indefinitely on the broker could be problematic, and giving clients the ability to determine how long retained messages should be kept seems more reasonable.

mochi-co commented 1 year ago

I think it's more of a lack of clarity in the specification, but it is there:

https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901112

If the Message Expiry Interval has passed and the Server has not managed to start onward delivery to a matching subscriber, then it MUST delete the copy of the message for that subscriber

This applies to all messages, including retained messages and will messages: they must be deleted if they have not started onward delivery to all/any subscribers by the time the expiry period has passed.

werbenhu commented 1 year ago

I still think that we shouldn't disable the clearing of expired retained messages if s.Options.Capabilities.MaximumMessageExpiryInterval == math.MaxInt64 ``

What would be the result if a sender sends a retained message that has a expiry of 1 hour but s.Options.Capabilities.MaximumMessageExpiryInterval == math.MaxInt64? The message will not be deleted from the broker because clearing of retained messages is disabled and therefore will not expire in 1 hour as the sender intended...

What do you think?

@dadebue you can now proceed to submit code changes based on this idea. @mochi-co it appears that we've lost the permission to push and submit to the forked branch.

mochi-co commented 1 year ago

I'm just catching up on this and it sounds like there are some misunderstandings on the spec and current design, but to confirm:

(edited - apologies it's late and I've been extraordinarily busy)

mochi-co commented 1 year ago

@mochi-co it appears that we've lost the permission to push and submit to the forked branch.

This may be related to the permissions changes I made, but I'm surprised it would affect an external fork. I will investigate 👍🏻

mochi-co commented 1 year ago

@werbenhu @dadebue @dgduncan I had a little look at this, and I think this is the approach I would take if I were trying to fix the MaxInt issue described. It is a bit more 'wordy' than the previous changes, but I feel it might be a bit easier to understand were we to come back to this code in 5 years and wonder what it does. All credit goes to @werbenhu however for identifying the issue in the first place!

diff --git forkSrcPrefix/server.go forkDstPrefix/server.go
index 8b7b46da77734338be58126635a96576dd270909..bb343cceaab7eafafb15a4b9d17710502ccde346 100644
--- forkSrcPrefix/server.go
+++ forkDstPrefix/server.go
@@ -1597,7 +1597,11 @@ func (s *Server) clearExpiredClients(dt int64) {
 // clearExpiredRetainedMessage deletes retained messages from topics if they have expired.
 func (s *Server) clearExpiredRetainedMessages(now int64) {
    for filter, pk := range s.Topics.Retained.GetAll() {
-       if (pk.Expiry > 0 && pk.Expiry < now) || pk.Created+s.Options.Capabilities.MaximumMessageExpiryInterval < now {
+       serverExpiry := pk.Created + s.Options.Capabilities.MaximumMessageExpiryInterval
+       if serverExpiry < 0 { // catch int overflows if pk.Created + MaximumMessageExpiryInterval > math.MaxInt
+           serverExpiry = s.Options.Capabilities.MaximumMessageExpiryInterval
+       }
+       if (pk.Expiry > 0 && pk.Expiry < now) || pk.Created > serverExpiry {
            s.Topics.Retained.Delete(filter)
            s.hooks.OnRetainedExpired(filter)
        }
diff --git forkSrcPrefix/server_test.go forkDstPrefix/server_test.go
index ec0dc3b6ea4d9e58166f5ee0e2d436baf38077b0..e27d3726d4271b04a35c3c9b940bfdd1bb1f77dc 100644
--- forkSrcPrefix/server_test.go
+++ forkDstPrefix/server_test.go
@@ -9,6 +9,7 @@ import (
    "encoding/binary"
    "io"
    "log/slog"
+   "math"
    "net"
    "strconv"
    "sync"
@@ -3278,6 +3279,23 @@ func TestServerClearExpiredRetained(t *testing.T) {
    require.Len(t, s.Topics.Retained.GetAll(), 2)
 }

+func TestServerClearExpiredRetainedMaxInt(t *testing.T) {
+   s := New(nil)
+   require.NotNil(t, s)
+   s.Options.Capabilities.MaximumMessageExpiryInterval = math.MaxInt
+
+   n := time.Now().Unix()
+   s.Topics.Retained.Add("a/b/c", packets.Packet{Created: n, Expiry: n - 1})
+   s.Topics.Retained.Add("d/e/f", packets.Packet{Created: n, Expiry: n - 2})
+   s.Topics.Retained.Add("g/h/i", packets.Packet{Created: n - 3}) // within bounds
+   s.Topics.Retained.Add("j/k/l", packets.Packet{Created: n - 5})
+   s.Topics.Retained.Add("m/n/o", packets.Packet{Created: n})
+
+   require.Len(t, s.Topics.Retained.GetAll(), 5)
+   s.clearExpiredRetainedMessages(n)
+   require.Len(t, s.Topics.Retained.GetAll(), 3)
+}
+
 func TestServerClearExpiredClients(t *testing.T) {
    s := New(nil)
    require.NotNil(t, s)
diff --git forkSrcPrefix/clients.go forkDstPrefix/clients.go
index 6d5ff9a0a8d416ab0036faabcab5fa822d6c136d..cd975bf39fa6b32388cc277f64e139ccba16aecb 100644
--- forkSrcPrefix/clients.go
+++ forkDstPrefix/clients.go
@@ -330,7 +330,11 @@ func (cl *Client) ResendInflightMessages(force bool) error {
 func (cl *Client) ClearInflights(now, maximumExpiry int64) []uint16 {
    deleted := []uint16{}
    for _, tk := range cl.State.Inflight.GetAll(false) {
-       if (tk.Expiry > 0 && tk.Expiry < now) || tk.Created+maximumExpiry < now {
+       serverExpiry := tk.Created + maximumExpiry
+       if serverExpiry < 0 { // catch int overflows if pk.Created + maximumExpiry > math.MaxInt
+           serverExpiry = maximumExpiry
+       }
+       if (tk.Expiry > 0 && tk.Expiry < now) || tk.Created > serverExpiry {
            if ok := cl.State.Inflight.Delete(tk.PacketID); ok {
                cl.ops.hooks.OnQosDropped(cl, tk)
                atomic.AddInt64(&cl.ops.info.Inflight, -1)
diff --git forkSrcPrefix/clients_test.go forkDstPrefix/clients_test.go
index e992b5dc718cee6b882f5ad3a773cf929106eec1..b6ea4e6e508f5c8d6d0ed784fd8b41427a03de0c 100644
--- forkSrcPrefix/clients_test.go
+++ forkDstPrefix/clients_test.go
@@ -11,6 +11,7 @@ import (
    "errors"
    "io"
    "log/slog"
+   "math"
    "net"
    "strings"
    "sync/atomic"
@@ -317,6 +318,23 @@ func TestClientClearInflights(t *testing.T) {
    require.Equal(t, 2, cl.State.Inflight.Len())
 }

+func TestClientClearInflightsMaxInt(t *testing.T) {
+   cl, _, _ := newTestClient()
+
+   n := time.Now().Unix()
+   cl.State.Inflight.Set(packets.Packet{PacketID: 1, Expiry: n - 1})
+   cl.State.Inflight.Set(packets.Packet{PacketID: 2, Expiry: n - 2})
+   cl.State.Inflight.Set(packets.Packet{PacketID: 3, Created: n - 3}) // within bounds
+   cl.State.Inflight.Set(packets.Packet{PacketID: 5, Created: n - 5})
+   cl.State.Inflight.Set(packets.Packet{PacketID: 7, Created: n})
+   require.Equal(t, 5, cl.State.Inflight.Len())
+
+   deleted := cl.ClearInflights(n, math.MaxInt)
+   require.Len(t, deleted, 2)
+   require.ElementsMatch(t, []uint16{1, 2}, deleted)
+   require.Equal(t, 3, cl.State.Inflight.Len())
+}
+
 func TestClientResendInflightMessages(t *testing.T) {
    pk1 := packets.TPacketData[packets.Puback].Get(packets.TPuback)
    cl, r, w := newTestClient()
werbenhu commented 1 year ago

@dadebue @mochi-co can you make changes to the code and submit it to resolve the unresolved conversations here? @mochi-co I still don't have the permission to modify the code when I review it, which also means I don't have the permission to submit code to the forked branch.

mochi-co commented 1 year ago

@werbenhu Try now

mochi-co commented 1 year ago

@werbenhu The current changes on the branch don't fully comply with the scenarios in https://github.com/mochi-mqtt/server/pull/315#issuecomment-1762419335 - With s.Options.Capabilities.MaximumMessageExpiryInterval == math.MaxInt64 we should still continue to expire messages which have their own set expiry values set.

I'd recommend reviewing the diff I posted above as I believe this addresses all the scenarios we are trying to fix.

werbenhu commented 1 year ago

I'm just catching up on this and it sounds like there are some misunderstandings on the spec and current design, but to confirm:

  • Setting s.Options.Capabilities.MaximumMessageExpiryInterval == math.MaxInt64 should not disable message expiry, it should be honoured even though it is infinitely in the future - unless there is a significant performance reason not to do so (I am unaware of such a thing currently).
  • if s.Options.Capabilities.MaximumMessageExpiryInterval is 1 or more, message expiry per message must still be honoured per the MQTTv5 spec if set in the message properties, but MQTTv3 messages and those without expiries should be retained forever.
  • if s.Options.Capabilities.MaximumMessageExpiryInterval is 0, message expiry per message must be ignored for all messages and protocols.

(edited - apologies it's late and I've been extraordinarily busy)

It's done now.

werbenhu commented 1 year ago

There still seem to be some issues here, I'll need to make some submissions later

mochi-co commented 1 year ago

There still seem to be some issues here, I'll need to make some submissions later

Which issues are you seeing?

werbenhu commented 1 year ago

I still think that we shouldn't disable the clearing of expired retained messages if s.Options.Capabilities.MaximumMessageExpiryInterval == math.MaxInt64 ``

What would be the result if a sender sends a retained message that has a expiry of 1 hour but s.Options.Capabilities.MaximumMessageExpiryInterval == math.MaxInt64? The message will not be deleted from the broker because clearing of retained messages is disabled and therefore will not expire in 1 hour as the sender intended...

What do you think?

@mochi-co I think @dadebue is right, and I've made some changes to it.

werbenhu commented 1 year ago

@mochi-co take a look at the current implementation. I always feel that what's written here in func (cl *Client) ClearInflights(now, maximumExpiry int64) []uint16 isn't concise and clear enough;

'maximumExpiry=0' carries a dual meaning:

dadebue commented 1 year ago

I like the current implementation.

Maybe we should change the name abandoned to something different. What the maximum expiry interval is doing is forcing the expiry of old messages. Why don't we call it forceExpiry then?

werbenhu commented 1 year ago

@dadebue @mochi-co I have added client.ClearExpiredInflights() to clear expired messages, while client.ClearInflights() is used to clear all inflight messages. Please review the code again. Thanks.

dadebue commented 1 year ago

I approve the changes with this comment ☺️ (I can't approve in GitHub as I wasn't requested to review)

werbenhu commented 1 year ago

@dadebue Because you are the author of this PR.:smile:

dadebue commented 1 year ago

@mochi-co @dgduncan @werbenhu what's is the release schedule for this repo? When will all the changes since October 2nd be released?

mochi-co commented 1 year ago

@dadebue Apologies, I've had some big life stuff going on and the time got away from me. I've released these changes as v2.4.2 :)

dadebue commented 1 year ago

No worries and thanks :)