rabbitmq / rabbitmq-server

Open source RabbitMQ: core server and tier 1 (built-in) plugins
https://www.rabbitmq.com/
Other
11.93k stars 3.9k forks source link

MQTT: replicated retained message store #8096

Open galah92 opened 1 year ago

galah92 commented 1 year ago

Is your feature request related to a problem? Please describe.

I'm using RabbitMQ as MQTT Broker with the rabbitmq_mqtt plugin. I'm deploying RabbitMQ over Kubernetes with the Kubernetes Operator. According to the docs:

messages retained on one broker node are not replicated to other nodes in the cluster

I'd like to ask to support it over multiple replicas/nodes/pods.

The incentive is that if a client will be connected to a specific pod, disconnect and then reconnect to a different pod, I'd like this client to be able to subscribe to the same MQTT topic and get the same data.

Describe the solution you'd like

I'd like to be able to restore retained messages on MQTT topics from clients connected on multiple nodes/replicas/pods.

Describe alternatives you've considered

It is probably possible to emulate that with a backend that listens to client subscription on "retained" topics and corresponding DB/cache (Redis-like) to store message retention state. But that doesn't sound right.

Additional context

No response

michaelklishin commented 1 year ago

I'm not sure if it was done as part of #5895 #2554 #7263. @ansd @ChunyiLyu would know.

ansd commented 1 year ago

@michaelklishin No, it was not done. Neither Native MQTT nor new feature requirements for MQTT 5.0 are related to replication of retained messages.

As documented in https://www.rabbitmq.com/mqtt.html#retained support for retained messages has been very limited in RabbitMQ because it works wrongly when the RabbitMQ cluster consists of multiple nodes. More specifically, the current implementation stores retained messages only node local. This means depending on which node a client connects and subscribes, if there is a retained message for a given topic, it may receive no, outdated, or the latest retained message for that topic. Furthermore, wildcards in topic filters are not supported for retained messages.

Given these limitations it makes sense to implement a new rabbit_mqtt_retained_msg_store.

Some of the requirements could be:

Some limitations for the implementation are:

Before thinking about an implementation, the very first question is which of the above requirements do we need (all, or just a subset)?

galah92 commented 1 year ago

I'll elaborate more regarding my use-case. I'm trying to replicate the behavior of Azure IoT Hub & AWS IoT Core by having a Device Twin (Azure terminology) or Device Shadow (AWS terminology). In that sense, each device has a config which it can get by subscribing to a concrete topic involving its client_id as part of the topic name. In that sense, I'm creating a backend service that should be able to mutate device configs by retaining messages on the corresponding topics. Note that in that case, the topics on which I'm retaining messages (devices/{client_id}/config) are used only to retain message. No other messages are sent these topics.

I terms of requirements:

Also, this is for MQTT 3.1.1 as I am not using MQTT 5.

ansd commented 1 year ago

Thank you for explaining your use case @galah92.

The solution will be much simpler if wildcards are not supported. In that case a distributed key value store could be used. If that key value store is built on top of https://github.com/rabbitmq/ra we need to think about snapshotting including the message payload. Another key value store solution could use consistent hashing.

We are a small team and while we invest into MQTT, our current focus has been improving scalability in https://github.com/rabbitmq/rabbitmq-server/pull/5895 and adding support for MQTT 5.0 in https://github.com/rabbitmq/rabbitmq-server/pull/7263 . Although important in the long term, implementing a replicated retained messages store (such as a distributed key value store) is a non-trivial task and not a priority for us right now and will therefore not be implemented in the near future. @galah92 if this is an important feature for you, feel free to provide a contribution or sponsor the development.

galah92 commented 1 year ago

Thank you for the explanation.

As learning the RabbitMQ internals & Erlang sounds out-of-my-scope for now, I would have to go with a different solution, i.e. an external key-value store (Redis) and backend service to watch for device subscriptions to queues and publishing messages accordingly.

For that I would need a way to store the currently subscribed topics. As these MQTT topics correspond to AMQP queues, I think this can be done by listening for queue.created & queue.deleted events. I would also need to disable to current retention mechanism so that it won't conflict with my external one. I think this can be done with rabbit_mqtt_retained_msg_store_noop - can you confirm?

In general, does that solution make sense?

michaelklishin commented 1 year ago

Yes, rabbit_mqtt_retained_msg_store_noop is the implementation you need. It won't retain anything (by design). I kind of forgot that we had it :)

ansd commented 1 year ago

Dumping here one more good idea that came up by @kjnilsson. A relatively simple solution that could work just well enough is:

This should provide very fast writes and reads as well as message replication while working with the current primitives available in RabbitMQ.

@galah92 Yes, currently I see 2 possible workarounds for you:

  1. The one you mentioned. But don't only listen on queue_created and queue_deleted, but more importantly also on binding_created and binding_deleted. Each binding from an MQTT queue to the topic exchange corresponds to one MQTT subscription. The binding key corresponds to the MQTT subscription topic filter (although some characters are converted, e.g. / in an MQTT topic filter gets converted to . in the AMQP 0.9.1 binding, same applies for + to *, see https://github.com/rabbitmq/rabbitmq-server/blob/bbb98226e28185aa9deb60e856496b523d624f95/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl#L120-L123 )
  2. You implement the rabbit_mqtt_retained_msg_store behaviour. This does require a bit of Erlang coding, but you just need an Erlang Redis client that inserts, lookups, deletes into / from Redis (Shouldn't require more than a few lines of Erlang code.)
kjnilsson commented 1 year ago

You also need to decide what to do when redis is down or otherwise unavailable. :)

galah92 commented 1 year ago

I was able to create POC based on Redis and using rabbit_mqtt_retained_msg_store_noop. So far looks good. Adding Redis as dependency to the system is worse then having a distributed key-value store in RabbitMQ, but at least it's a valid solution that works. Thanks everyone.

michaelklishin commented 1 year ago

The stream-based option seems better to me that a new embedded distributed key-value store. It will not be perfectly consistent when compared to what streams themselves offer due to the step that copied inbound data concurrently to an ETS table. Otherwise it can be perfectly sufficient.

galah92 commented 1 year ago

Hi, just saw 3.13.0-beta.1 was released. The release notes didn't mention anything related to message retention, but was is implemented by any chance? For MQTT 3 & 5.

lukebakken commented 1 year ago

Hi, just saw 3.13.0-beta.1 was released. The release notes didn't mention anything related to message retention, but was is implemented by any chance? For MQTT 3 & 5

This issue is still open, so no, it has not been implemented.

gery0815 commented 12 months ago

Hi all, we created a plugin to store the retained messges on a redis cluster. We are hosting our 3 node cluster on kubernetes. Maybe there is some interest on our implementation. Would be a pleasure to share it.

michaelklishin commented 12 months ago

@gery0815 you are welcome to leave a link to the plugin here but for any further discussions of it, use GitHub Discussions.

jdsdc commented 10 months ago

Hi all, we created a plugin to store the retained messges on a redis cluster. We are hosting our 3 node cluster on kubernetes. Maybe there is some interest on our implementation. Would be a pleasure to share it.

@gery0815, would it be possible for you to share more information about the plugin you wrote for redis?

gery0815 commented 10 months ago

@jdsdc @michaelklishin I started a discussion. There I also posted the link to my repo. Feel free to use !

https://github.com/rabbitmq/rabbitmq-server/discussions/9431 https://github.com/gery0815/rabbitmq-mqtt-retained-msg-redis

robertsLando commented 4 months ago

Seems the redis plugin is not working with latest RabbitMQ version. Could I ask rabbitMQ maintainers if there is a WIP or an ETA to support cross nodes/replicas/pods retained messages store? I'm wondering if I'm the only one that consider this a big limitation, I use retained messages a lot to keep the state in sync but if I have multiple RabbitMQ nodes right now I cannot be sure my clients will receive all retained messages when they do a subscription as they are fethed just from the node they are connecting too. I think also this is not what users would expect it to work

getlarge commented 4 months ago

@ansd @kjnilsson I am very interested to know more about the solution based on the RabbitMQ streams that you presented here. Since I have no experience with streams yet, my questions might appear naïve, but how would you implement such a solution?

michaelklishin commented 4 months ago

The solution based on streams, or rather certain internal stream API elements, is still very much a hypothesis to prove at this point. So are other options.

There are no plans to introduce more plugins.

robertsLando commented 3 weeks ago

We created how own implementation using Redis as a fork of @gery0815 one: https://github.com/innovation-system/rabbitmq-mqtt-retained-msg-redis

This is something we really miss from RabbitMQ, hope one day there will be some work for supporting this 🙏🏼