mochi-mqtt / server

The fully compliant, embeddable high-performance Go MQTT v5 server for IoT, smarthome, and pubsub
MIT License
1.29k stars 222 forks source link

Default value of MaximumMessageExpiryInterval #306

Closed dadebue closed 1 year ago

dadebue commented 1 year ago

We are having problems with retained messages being deleted exactly one day after publishing. I guess the source lies in the server.Options.Capabilities.MaximumMessageExpiryInterval property.

When starting the server with nil options (default capabilities):

server := mqtt.New(nil)

the server.Options.Capabilities.MaximumMessageExpiryInterval is set to 86400 seconds (exactly 1 day).

Shouldn't it be the maximum possible int64 value to better represent the fact that the server was started without a maximum message expiry interval?

mochi-co commented 1 year ago

Hi @dadebue!

This is correct - math.MaxInt allows an effectively infinite number of retained and inflight messages to queue up on the server. In a hostile network environment, this can be used to DOS the broker, and we didn't feel comfortable releasing with this as the default configuration. For this reason we opted for a 'safer' default value.

You can still configure it to use math.MaxInt, or some other value, however.🙂

Happy to discuss increasing this default value, too.

dadebue commented 1 year ago

Hi @mochi-co!

Thank you for the explanation. I understand and it definitely makes sense to limit the default value for the public release. How about adding this (small but important) information to the README?

Especially since other brokers don't seem to limit this by default (or at least the info about expiry interval states that if not provided the broker should hold the message indefinitely):

Thanks Marco

mochi-co commented 1 year ago

@dadebue This is a very good point. I've added it to the README - let me know if it's unclear or needs improvement 🙂 Thank you for raising it!

dadebue commented 1 year ago

This needs some more thought! I tried setting the MaximumMessageExpiryInterval values to 0 or math.MaxInt. And in both cases the retained messages are cleared right away.

The problem lies in line 1600 of server.go:

if (pk.Expiry > 0 && pk.Expiry < now) || pk.Created+s.Options.Capabilities.MaximumMessageExpiryInterval < now {

The second condition is always true for 0 and Math.MaxInt:

I suggest something like this (variable declaration just for readability):

// clearExpiredRetainedMessage deletes retained messages from topics if they have expired.
func (s *Server) clearExpiredRetainedMessages(now int64) {
    if s.Options.Capabilities.MaximumMessageExpiryInterval == 0 || s.Options.Capabilities.MaximumMessageExpiryInterval == math.MaxInt {
        return
    }

    for filter, pk := range s.Topics.Retained.GetAll() {
        maxExpiry := pk.Created + s.Options.Capabilities.MaximumMessageExpiryInterval
        packageExpired := pk.Expiry > 0 && pk.Expiry < now
        forceExpiry := maxExpiry > 0 && maxExpiry < now

        if packageExpired || forceExpiry {
            s.Topics.Retained.Delete(filter)
            s.hooks.OnRetainedExpired(filter)
        }
    }
}
dgduncan commented 1 year ago

@dadebue This is interesting, I will take a look at this in a bit as I heavily use retained messages. Would you mind sending your broker's config?

dadebue commented 1 year ago

Basically I'm starting like this. Is that enough info for you?

        level := new(slog.LevelVar)
    if config.Debug {
        level.Set(slog.LevelDebug)
    } else {
        level.Set(slog.LevelInfo)
    }

    logger := slog.New(&hooks.FilterHandler{
        H: slog.NewJSONHandler(os.Stdout,
            &slog.HandlerOptions{
                Level: level,
            }),
    })

    server := mqtt.New(&mqtt.Options{
        Logger: logger,
    })

    // server.Options.Capabilities.MaximumMessageExpiryInterval = 0
    // server.Options.Capabilities.MaximumMessageExpiryInterval = math.MaxInt
    server.Options.Capabilities.MaximumMessageExpiryInterval = 999999999999

hooks.FilterHandler is from #302

dgduncan commented 1 year ago

That works for me! Thank you for sending.

werbenhu commented 1 year ago

This needs some more thought! I tried setting the MaximumMessageExpiryInterval values to 0 or math.MaxInt. And in both cases the retained messages are cleared right away.

The problem lies in line 1600 of server.go:

if (pk.Expiry > 0 && pk.Expiry < now) || pk.Created+s.Options.Capabilities.MaximumMessageExpiryInterval < now {

The second condition is always true for 0 and Math.MaxInt:

  • for 0 it's true because Created is always smaller than now
  • for math.MaxInt it's true because of Go integer overflow (variable wraps around and becomes negative and therefore is smaller that now)

I suggest something like this (variable declaration just for readability):

// clearExpiredRetainedMessage deletes retained messages from topics if they have expired.
func (s *Server) clearExpiredRetainedMessages(now int64) {
  if s.Options.Capabilities.MaximumMessageExpiryInterval == 0 || s.Options.Capabilities.MaximumMessageExpiryInterval == math.MaxInt {
      return
  }

  for filter, pk := range s.Topics.Retained.GetAll() {
      maxExpiry := pk.Created + s.Options.Capabilities.MaximumMessageExpiryInterval
      packageExpired := pk.Expiry > 0 && pk.Expiry < now
      forceExpiry := maxExpiry > 0 && maxExpiry < now

      if packageExpired || forceExpiry {
          s.Topics.Retained.Delete(filter)
          s.hooks.OnRetainedExpired(filter)
      }
  }
}

@dadebue Thanks. This looks good. It would be great if you could submit this change as a PR .

dadebue commented 1 year ago

@dadebue Thanks. This looks good. It would be great if you could submit this change as a PR .

Ofc. It seems I don't have enough permissions:

Screenshot_2023-10-04_20-56-49

BTW: Code needs to be tweaked. Otherwise if MaximumMessageExpiryInterval is set to 0 or MaxInt no messages will expire even if a message has a expiry explicitly set:

// clearExpiredRetainedMessage deletes retained messages from topics if they have expired.
func (s *Server) clearExpiredRetainedMessages(now int64) {
    isServerExpiryLimited := s.Options.Capabilities.MaximumMessageExpiryInterval != 0 && s.Options.Capabilities.MaximumMessageExpiryInterval != math.MaxInt

    for filter, pk := range s.Topics.Retained.GetAll() {
        packetExpired := pk.Expiry > 0 && pk.Expiry < now
        maxPacketExpiry := pk.Created + s.Options.Capabilities.MaximumMessageExpiryInterval
        forceExpiry := isServerExpiryLimited && maxPacketExpiry > 0 && maxPacketExpiry < now

        if packetExpired || forceExpiry {
            s.Topics.Retained.Delete(filter)
            s.hooks.OnRetainedExpired(filter)
        }
    }
}
werbenhu commented 1 year ago

@dadebue Refer to Create a pull request

werbenhu commented 1 year ago

@dadebue Could you take a look at the Unresolved conversations in PR https://github.com/mochi-mqtt/server/pull/315 ?

dadebue commented 1 year ago

@dadebue Could you take a look at the Unresolved conversations in PR #315 ?

I was struggling with a cold for a few days...

werbenhu commented 1 year ago

❤️ I'm sorry to hear that . Take your time to recover, and we can address the issue once you're feeling better.

mochi-co commented 1 year ago

@dadebue Hope you feel better soon! Remember to get plenty of rest and water!

dadebue commented 1 year ago

But I've added a comment in the meantime...

werbenhu commented 1 year ago

@dadebue @mochi-co I have submitted some code changes to PR #315 . Please review the code and verify if it is effective.

dadebue commented 1 year ago

I've reviewed and added a comment...