rabbitmq / rabbitmq-delayed-message-exchange

Delayed Messaging for RabbitMQ
Other
2.04k stars 150 forks source link

v2 design ideas #229

Open michaelklishin opened 1 year ago

michaelklishin commented 1 year ago

Updates

The design below evolved over time. The last major update is from Sep 26, 2023.

Problem Definition

The current design of this plugin has plenty of limitations that make it unsuitable for many environments. They are visibly called out in the README but to reiterate:

These limitations call for a new plugin with the same goal but a different design. For the lack of a more creative name, let's call it Delayed Message Exchange v2 or simply "v2".

This issue is an outline of some of the ideas our team has. I am intentionally filing it as an issue, despite this being just a number of ideas.

You are welcome to share your ideas in the comments as long as

  1. The goal does not venture too far from what this plugin currently can do. In other words, the goal is to keep it small and focussed, and some ideas will not be accepted
  2. The discussion remains civil, unlike some other issues in this repo. If it ends up being too heated, it will be locked. Insults towards the maintainers will not be tolerated

Where to Store Messages

This plugin stores messages for future and some metadata about them:

This information should be replicated. Unlike in the early 2010s, when this plugin was first envisioned, modern RabbitMQ versions have a mature and efficient Raft implementation plus a few more sub-projects that can be considered for distributed plugins such as this one in v2.

Streams do not allow for random reads of individual messages, so using a stream or an Osiris log directly would only be possible as a replication mechanism that local instances of the plugin will read from and put the data into a suitable store providing random reads.

Perhaps ironically we need a classic efficient disk-based key value store, with local storage provided by something like

and the distribution layer provided by Ra. This example Ra-based K/V store would have to have a more mature version used for more than running Jepsen tests against Ra.

This store would allow for random reads of individual messages stored for later publishing and needs to provide a very minimalistic API.

Rust-based stores such as Marble can be accessed via Rustler.

Where to Store Metadata

RabbitMQ is in the middle of a transition to Khepri, a Raft-based tree-structured data store we have developed for storing schema.

Khepri will be first introduced in RabbitMQ 3.13 and is going to be merged into main.

Specifically Khepri is suitable for storing an index of delayed messages:

The index will help locate the IDs of messages that are up for delivery. The messages themselves, possibly with their metadata, can be loaded from a durable key/value store described above.

Because Khepri will be available via a feature flag in 3.13, this plugin will have to require that flag.

Using Fewer Timers

Assuming that all metadata, including expiration, is stored in Khepri in a way that makes it possible to easily query a subset of delayed messages, this plugin can use a very small number of timers, for example, just one or one per virtual host.

Not having a lot of far-in-the-future timers has another benefit: the current (2^32 - 1) limitation for timer intervals will no longer apply. The timer will be used as a periodic tick process that finds out what messages are up for re-publishing, loads their metadata and hands it all off to a different process.

This way, if someone wants to delay a message for more than ≈ 24 days, they will be able to do it, even though I would still not recommend it.

What Metrics to Provide

Right now this plugin provides a single metric: the number of delayed messages. There is interest in turning it into a histogram of time periods.

More Powerful CLI Tools

Besides inspecting various metrics provided by this plugin, the operator will occasionally need to manipulate the messages delayed: delete individual messages or a subset, inspect them, and so on.

Moving storage to a stream will make inspection possible, and deletion can be done on just the metadata, with stream retention policies taking care of cleaning up the newly orphaned messages on disk.

Re-publishing from a "Leader"

Like with many non-eventually consistent distributed systems, we have to either decide to perform writes via a single elected leader, or partition the data set such that N writers can co-exist within a single cluster.

Some existing (commercial) plugins use a per-virtual host partitioning scheme. For this plugin it makes more sense to do this per-exchange. We cannot do this per-queue because the queues the message will route to is unknown in advance.

Mapping routing keys to a set of writers/publishers won't work either because we need to guarantee message ordering within a single queue.

Khepri, much like etcd, can be used for leader elections much like etcd is used by Kubernetes-oriented systems.

Known Problems and Limitations

The biggest issue before RabbitMQ 3.13.0 will be Khepri cluster formation. Getting it right from a plugin can be painful. When RabbitMQ itself introduces Khepri in the core, it's not clear whether a reasonable upgrade path can be provided.

Khepri is a tree-structured store, so certain types of queries will not be an option. This means that the data model of this plugin has to be carefully designed to support the subset of CLI commands we'd introduce.

Khepri, much like etcd, can be used for leader elections. But it's not RabbitMQ's use case for Khepri, so there may be "unknown unknowns".

In the discussion about how many leaders/writers the cluster of plugin instances should have, we completely avoid the issue of message ordering at their original publishing time.

For example, if messages M1, M2, …, Mn are published in order but their delay is such that they all must be published at the same time, would the users expect the M1, M2, …, Mn order to be preserved? If so, what kind of leader/writer design trade-offs would that entail?

michaelklishin commented 1 year ago

Updated after a closer review with @SimonUnge and a few members of the core team.

gomoripeti commented 11 months ago

Great initiative, great write up. I dont have any comment on the details of the storage implementation itself (khepri for metadata and a KV store sounds good) Just putting down some random ideas about the plugin

Expectations

Metrics

Joseph-Zhichao commented 4 months ago

Hello @michaelklishin,

I have some question about using delay message plugin,

You mentioned the limitation

Not having a lot of far-in-the-future timers has another benefit: the current (2^32 - 1) limitation for timer intervals will no longer apply. The timer will be used as a periodic tick process that finds out what messages are up for re-publishing, loads their metadata and hands it all off to a different process. This way, if someone wants to delay a message for more than ≈ 24 days, they will be able to do it, even though I would still not recommend it.

If I understand correctly, the latest design supports more than 24 days. But I am unsure about the limit. I am considering using the RabbitMQ delay message plug-in for my project which needs a delay of 300 days, would rabbit still be the best solution for me?

I am also curious about what's the size limit on the exchange. In my use case, I would need to put over 10k-20k messages into the exchange with various delays. Would the messages still be persisted (remain in the exchange) if rabbit server went down.

I'd appreciate it very much if you could help me clear the confusion

illotum commented 4 months ago

@Joseph-Zhichao with respect, sounds like you need a database not message broker.

Joseph-Zhichao commented 4 months ago

@illotum thanks for the reply, Alex.

We are trying to explore the options without using a database, but I agree with you!

jarodriguez-itsoft commented 2 weeks ago

Thanks @michaelklishin for the detailed write up!

Have you considered using KeyDB (a Redis open-source branch)? It already supports clustering out-of-the-box and has an interesting mechanism that would avoid you use any timers: you can set keys to expire and be notified via pub/sub when they do (https://docs.keydb.dev/docs/notifications/) Max. expiration time is 64 bit, and can be set in ms or seconds,, so I guess it's more than enough for the requirements ;)

michaelklishin commented 2 weeks ago

@jarodriguez-itsoft asking every RabbitMQ user who uses the plugin today or will its "successor" in Tanzu RabbitMQ to "bring their own Redis" is not realistic. But you are welcome to build a plugin that would be Redis-specific.

jarodriguez-itsoft commented 2 weeks ago

Hi @michaelklishin I understand your point and mostly agree with it, but being able to configure alternative K-V stores would be a great enhancement. I had a quick look over the plugin code and TBH, before adding support to Redis, I think it should be refactored and generalized to separate the current Mnesia logic into an interface we could implement to initialize and use different stores. The timer logic should also be interfaced so it could be replaced by an event-driven mechanism.

I think adding support to Keydb in Erlang is relatively straightforward as erl already has support to Redis, but making a fork can be a pain, not only because of the current codebase being fully dependant on Mnesia but also with the changes you are gonna make regarding schema API modules usage. Having a separated plugin for each store type means the mantiners will need to upgrade it everytime a new Rabbit or delayed-exchange plugin version is released.

I see it working with some optional configuration options which default to whatever K-V engine is decided to be the default one when missing. e.g. "rabbitmq_delayed_exchange_engine" = "redis", "rabbitmq_delated_exchange_connectionstring" = "127.0.0.1:6379", etc... and the plugin using a factory-based initialization of the engine.

What do you think?

michaelklishin commented 2 weeks ago

@jarodriguez-itsoft "alternative K-V stores" is not a design goal here. Like I said, you are welcome to build a separate exchange type plugin that uses any K-V store you need. This is extensible open source software after all.