Disque is an ongoing experiment to build a distributed, in-memory, message broker. Its goal is to capture the essence of the "Redis as a jobs queue" use case, which is usually implemented using blocking list operations, and move it into an ad-hoc, self-contained, scalable, and fault tolerant design, with simple to understand properties and guarantees, but still resembling Redis in terms of simplicity, performance, and implementation as a C non-blocking networked server.
Currently (2 Jan 2016) the project is in release candidate state. People are encouraged to start evaluating it and report bugs and experiences.
WARNING: This is beta code and may not be suitable for production usage. The API is considered to be stable if not for details that may change in the next release candidates, however it's new code, so handle with care!
Hint: skip this section if you are familiar with message queues.
You know how humans use text messages to communicate, right? I could write my wife "please get the milk at the store", and she maybe will reply "Ok message received, I'll get two bottles on my way home".
A message queue is the same as human text messages, but for computer programs. For example a web application, when an user subscribes, may send another process, that handles sending emails, "please send the confirmation email to tom@example.net".
Message systems like Disque allow communication between processes using different queues. So a process can send a message to a queue with a given name, and only processes which fetch messages from this queue will return those messages. Moreover, multiple processes can listen for messages in a given queue, and multiple processes can send messages to the same queue.
The important part of a message queue is to be able to provide guarantees so that messages are eventually delivered even in the face of failures. So even if in theory implementing a message queue is very easy, to write a very robust and scalable one is harder than it may appear.
Disque is a distributed and fault tolerant message broker, so it works as a middle layer among processes that want to exchange messages.
Producers add messages that are served to consumers. Since message queues are often used in order to process delayed jobs, Disque often uses the term "job" in the API and in the documentation, however jobs are actually just messages in the form of strings, so Disque can be used for other use cases. In this documentation "jobs" and "messages" are used in an interchangeable way.
Job queues with a producer-consumer model are pretty common, so the devil is in the details. A few details about Disque are:
Disque is a synchronously replicated job queue. By default when a new job is added, it is replicated to W nodes before the client gets an acknowledgement about the job being added. W-1 nodes can fail and the message will still be delivered.
Disque supports both at-least-once and at-most-once delivery semantics. At-least-once delivery semantics is where most effort was spent in the design and implementation, while at-most-once semantics is a trivial result of using a retry time set to 0 (which means, never re-queue the message again) and a replication factor of 1 for the message (not strictly needed, but it is useless to have multiple copies of a message around if it will be delivered at most one time). You can have, at the same time, both at-least-once and at-most-once jobs in the same queues and nodes, since this is a per message setting.
Disque at-least-once delivery is designed to approximate single delivery when possible, even during certain kinds of failures. This means that while Disque can only guarantee a number of deliveries equal or greater to one, it will try hard to avoid multiple deliveries whenever possible.
Disque is a distributed system where all nodes have the same role (aka, it is multi-master). Producers and consumers can attach to whatever node they like, and there is no need for producers and consumers of the same queue to stay connected to the same node. Nodes will automatically exchange messages based on load and client requests.
Disque is Available (it is an eventually consistent AP system in CAP terms): producers and consumers can make progress as long as a single node is reachable.
Disque supports optional asynchronous commands that are low latency for the client but provide less guarantees. For example a producer can add a job to a queue with a replication factor of 3, but may want to run away before knowing if the contacted node was really able to replicate it to the specified number of nodes or not. The node will replicate the message in the background in a best effort way.
Disque automatically re-queues messages that are not acknowledged as already processed by consumers, after a message-specific retry time. There is no need for consumers to re-queue a message if it was not processed.
Disque uses explicit acknowledges in order for a consumer to signal a message as delivered (or, using a different terminology, to signal a job as already processed).
Disque queues only provides best effort ordering. Each queue sorts messages based on the job creation time, which is obtained using the wall clock of the local node where the message was created (plus an incremental counter for messages created in the same millisecond), so messages created in the same node are normally delivered in the same order they were created. This is not causal ordering since correct ordering is violated in different cases: when messages are re-issued because they are not acknowledged, because of nodes local clock drifts, and when messages are moved to other nodes for load balancing and federation (in this case you end with queues having jobs originated in different nodes with different wall clocks). However all this also means that normally messages are not delivered in random order and usually messages created first are delivered first.
Note that since Disque does not provide strict FIFO semantics, technically speaking it should not be called a message queue, and it could better identified as a message broker. However I believe that at this point in the IT industry a message queue is often more lightly used to identify a generic broker that may or may not be able to guarantee order in all cases. Given that we document the semantics very clearly, I grant myself the right to call Disque a message queue anyway.
Disque provides the user with fine-grained control for each job using three time related parameters, and one replication parameter. For each job, the user can control:
Finally, Disque supports optional disk persistence, which is not enabled by default, but that can be handy in single data center setups and during restarts.
Other minor features are:
Disque's implementation of at-least-once delivery semantics is designed in order to avoid multiple delivery during certain classes of failures. It is not able to guarantee that no multiple deliveries will occur. However there are many at-least-once workloads where duplicated deliveries are acceptable (or explicitly handled), but not desirable either. A trivial example is sending emails to users (it is not terrible if an user gets a duplicated email, but is important to avoid it when possible), or doing idempotent operations that are expensive (all the times where it is critical for performance to avoid multiple deliveries).
In order to avoid multiple deliveries when possible, Disque uses client ACKs. When a consumer processes a message correctly, it should acknowledge this fact to Disque. ACKs are replicated to multiple nodes, and are garbage collected as soon as the system believes it is unlikely that more nodes in the cluster have the job (the ACK refers to) still active. Under memory pressure or under certain failure scenarios, ACKs are eventually discarded.
More explicitly:
For example, if a node having a copy of a job gets partitioned away during the time the job gets acknowledged by the consumer, it is likely that when it returns (in a reasonable amount of time, that is, before the retry time is reached) it will be informed about the ACK and will avoid to re-queue the message. Similarly, jobs can be acknowledged during a partition to just a single available node, and when the partition heals the ACK will be propagated to other nodes that may still have a copy of the message.
So an ACK is just a proof of delivery that is replicated and retained for some time in order to make multiple deliveries less likely to happen in practice.
As already mentioned, in order to control replication and retries, a Disque job has the following associated properties: number of replicas, delay, retry and expire.
If a job has a retry time set to 0, it will get queued exactly once (and in this case a replication factor greater than 1 is useless, and signaled as an error to the user), so it will get delivered either a single time or will never get delivered. While jobs can be persisted on disk for safety, queues aren't, so this behavior is guaranteed even when nodes restart after a crash, whatever the persistence configuration is. However when nodes are manually restarted by the sysadmin, for example for upgrades, queues are persisted correctly and reloaded at startup, since the store/load operation is atomic in this case, and there are no race conditions possible (it is not possible that a job was delivered to a client and is persisted on disk as queued at the same time).
Disque supports a faster way to acknowledge processed messages, via the
FASTACK
command. The normal acknowledge is very expensive from the point of
view of messages exchanged between nodes, this is what happens during a normal
acknowledge:
Note: actual garbage collection is more complex in case of failures and is explained in the state machine later. The above is what happens 99% of times.
If a message is replicated to 3 nodes, acknowledging requires 1+2+2+2 messages, for the sake of retaining the ack if some nodes may not be reached when the message is acknowledged. This makes the probability of multiple deliveries of this message less likely.
However the alternative fast ack, while less reliable, is much faster and invovles exchanging less messages. This is how a fast acknowledge works:
FASTACK
to one node.If during a fast acknowledge a node having a copy of the message is not reachable, for example because of a network partition, the node will deliver the message again, since it has a non-acknowledged copy of the message and there is nobody able to inform it the message has been acknowledged when the partition heals.
If the network you are using is pretty reliable, and you are very concerned with
performance, and multiple deliveries in the context of your applications are
a non issue, then FASTACK
is probably the way to go.
Many message queues implement a feature called dead letter queue. It is a special queue used in order to accumulate messages that cannot be processed for some reason. Common causes could be:
The idea is that the administrator of the system checks (usually via automatic systems) if there is something in the dead letter queue in order to understand if there is some software error or other kind of error preventing messages from being processed as expected.
Since Disque is an in-memory system, the message time-to-live is an important property. When it is reached, we want messages to go away, since the TTL should be chosen so that after such a time it is no longer meaningful to process the message. In such a system, to use memory and create a queue in response to an error or to messages timing out looks like a non optimal idea. Moreover, due to the distributed nature of Disque, dead letters could end up spawning multiple nodes and having duplicated entries in them.
So Disque uses a different approach. Each node message representation has two counters: a nacks counter and an additional deliveries counter. The counters are not consistent among nodes having a copy of the same message, they are just best effort counters that may not increment in some node during network partitions.
The idea of these two counters is that one is incremented every time a worker
uses the NACK
command to tell the queue the message was not processed correctly
and should be put back on the queue ASAP. The other is incremented for every other condition (different than the NACK
call) that requires a message to be put back
on the queue again. This includes messages that get lost and are enqueued again
or messages that are enqueued on one side of the partition since the message
was processed on the other side and so forth.
Using the GETJOB
command with the WITHCOUNTERS
option, or using the
SHOW
command to inspect a job, it is possible to retrieve these two counters
together with the other job information, so if a worker, before processing
a message, sees the counters have values over some application-defined limit, it
can notify operations people in multiple ways:
Basically the exact handling of the feature is up to the application using Disque. Note that the counters don't need to be consistent in the face of failures or network partitions: the idea is that eventually if a message has issues the counters will get incremented enough times to reach the limit selected by the application as a warning threshold.
The reason for having two distinct counters is that applications may want
to handle the case of explicit negative acknowledges via NACK
differently
than multiple deliveries because of timeouts or messages getting lost.
Disque can be operated in-memory only, using synchronous replication as a durability guarantee, or can be operated using the Append Only File where jobs creations and evictions are logged on disk (with configurable fsync policies) and reloaded at restart.
AOF is recommended especially if you run in a single availability zone where a mass reboot of all your nodes is possible.
Normally Disque only reloads job data in memory, without populating queues, since unacknowledged jobs are requeued eventually. Moreover, reloading queue data is not safe in the case of at-most-once jobs having the retry value set to 0. However a special option is provided in order to reload the full state from the AOF. This is used together with an option that allows shutting down the server just after the AOF is generated from scratch, in order to make it safe even to reload jobs with retry set to 0, since the AOF is generated while the server no longer accepts commands from clients, so no race condition is possible.
Even when running memory-only, Disque is able to dump its memory on disk and reload from disk on controlled restarts, for example in order to upgrade the software.
This is how to perform a controlled restart, that works whether AOF is enabled or not:
At this point we have a freshly generated AOF on disk, and the server is
configured in order to load the full state only at the next restart
(aof-enqueue-jobs-once
is automatically turned off after the restart).
We can just restart the server with the new software, or in a new server, and
it will restart with the full state. Note that aof-enqueue-jobs-once
implies loading the AOF even if AOF support is switched off, so there is
no need to enable AOF just for the upgrade of an in-memory only server.
Disque jobs are uniquely identified by an ID like the following:
D-dcb833cf-8YL1NT17e9+wsA/09NqxscQI-05a1
Job IDs are composed of exactly 40 characters and start with the prefix D-
.
We can split an ID into multiple parts:
D- | dcb833cf | 8YL1NT17e9+wsA/09NqxscQI | 05a1
D-
is the prefix.dcb833cf
is the first 8 bytes of the node ID where the message was generated.8YL1NT17e9+wsA/09NqxscQI
is the 144 bit ID pseudo-random part encoded in base64.05a1
is the Job TTL in minutes. Because of it, message IDs can be expired safely even without having the job representation.IDs are returned by ADDJOB when a job is successfully created, are part of the GETJOB output, and are used in order to acknowledge that a job was correctly processed by a worker.
Part of the node ID is included in the message so that a worker processing messages for a given queue can easily guess what are the nodes where jobs are created, and move directly to these nodes to increase efficiency instead of listening for messages in a node that will require to fetch messages from other nodes.
Only 32 bits of the original node ID is included in the message, however in a cluster with 100 Disque nodes, the probability of two nodes having identical 32 bit ID prefixes is given by the birthday paradox:
P(100,2^32) = .000001164
In case of collisions, the workers may just make a non-efficient choice.
Collisions in the 144 bits random part are believed to be impossible, since it is computed as follows.
144 bit ID = HIGH_144_BITS_OF_SHA1(seed || counter)
Where:
/dev/urandom
at startup.So there are 22300745198530623141535718272648361505980416 possible IDs, selected in a uniform way. While the probability of a collision is non-zero mathematically, in practice each ID can be regarded as unique.
The encoded TTL in minutes has a special property: it is always even for at most once jobs (job retry value set to 0), and is always odd otherwise. This changes the encoded TTL precision to 2 minutes, but allows to tell if a Job ID is about a job with deliveries guarantees or not. Note that this fact does not mean that Disque jobs TTLs have a precision of two minutes. The TTL field is only used to expire job IDs of jobs a given node does not actually have a copy, search "dummy ACK" in this documentation for more information.
To play with Disque please do the following:
make
. Binaries (disque
and disque-server
) will end up in the src
directory.disque.conf
files following the example disque.conf
in the source distribution.CLUSTER MEET <ip> <port>
for every other node in the cluster.Please note that you need to open two TCP ports on each node, the base port of the Disque instance, for example 7711, plus the cluster bus port, which is always at a fixed offset, obtained summing 10000 to the base port, so in the above example, you need to open both 7711 and 17711. Disque uses the base port to communicate with clients and the cluster bus port to communicate with other Disque processes.
To run a node, just call ./disque-server
.
For example, if you are running three Disque servers in port 7711, 7712, 7713, in order to join the cluster you should use the disque
command line tool and run the following commands:
./disque -p 7711 cluster meet 127.0.0.1 7712
./disque -p 7711 cluster meet 127.0.0.1 7713
Your cluster should now be ready. You can try to add a job and fetch it back in order to test if everything is working:
./disque -p 7711
127.0.0.1:7711> ADDJOB queue body 0
D-dcb833cf-8YL1NT17e9+wsA/09NqxscQI-05a1
127.0.0.1:7711> GETJOB FROM queue
1) 1) "queue"
2) "D-dcb833cf-8YL1NT17e9+wsA/09NqxscQI-05a1"
3) "body"
Remember that you can add and get jobs from different nodes as Disque is multi master. Also remember that you need to acknowledge jobs otherwise they'll never go away from the server memory (unless the time-to-live is reached).
The Disque API is composed of a small set of commands, since the system solves a single very specific problem. The three main commands are:
ADDJOB queue_name job <ms-timeout> [REPLICATE <count>] [DELAY <sec>] [RETRY <sec>] [TTL <sec>] [MAXLEN <count>] [ASYNC]
Adds a job to the specified queue. Arguments are as follows:
The command returns the Job ID of the added job, assuming ASYNC is specified, or if the job was replicated correctly to the specified number of nodes. Otherwise an error is returned.
GETJOB [NOHANG] [TIMEOUT <ms-timeout>] [COUNT <count>] [WITHCOUNTERS] FROM queue1 queue2 ... queueN
Return jobs available in one of the specified queues, or return NULL if the timeout is reached. A single job per call is returned unless a count greater than 1 is specified. Jobs are returned as a three-element array containing the queue name, the Job ID, and the job body itself. If jobs are available in multiple queues, queues are processed left to right.
If there are no jobs for the specified queues, the command blocks, and messages are exchanged with other nodes, in order to move messages about these queues to this node, so that the client can be served.
Options:
ACKJOB jobid1 jobid2 ... jobidN
Acknowledges the execution of one or more jobs via job IDs. The node receiving the ACK will replicate it to multiple nodes and will try to garbage collect both the job and the ACKs from the cluster so that memory can be freed.
A node receiving an ACKJOB command about a job ID it does not know will create a special empty job, with the state set to "acknowledged", called a "dummy ACK". The dummy ACK is used in order to retain the acknolwedge during a netsplit if the ACKJOB is sent to a node that does not have a copy of the job. When the partition heals, job garbage collection will be attempted.
However, since the job ID encodes information about the job being an "at-most- once" or an "at-least-once" job, the dummy ACK is only created for at-least- once jobs.
FASTACK jobid1 jobid2 ... jobidN
Performs a best-effort cluster-wide deletion of the specified job IDs. When the
network is well connected and there are no node failures, this is equivalent to
ACKJOB
but much faster (due to less messages being exchanged), however during
failures it is more likely that fast acknowledges will result in multiple
deliveries of the same messages.
WORKING jobid
Claims to be still working with the specified job, and asks Disque to postpone the next time it will deliver the job again. The next delivery is postponed for the job retry time, however the command works in a best effort way since there is no way to guarantee during failures that another node in a different network partition won't perform a delivery of the same job.
Another limitation of the WORKING
command is that it cannot be sent to
nodes not knowing about this particular job. In such a case the command replies
with a NOJOB
error. Similarly, if the job is already acknowledged an error
is returned.
Note that the WORKING
command is refused by Disque nodes if 50% of the job
time to live has already elapsed. This limitation makes Disque safer since
usually the retry time is much smaller than the time-to-live of a job, so
it can't happen that a set of broken workers monopolize a job with WORKING
and never process it. After 50% of the TTL has elapsed, the job will be delivered
to other workers anyway.
Note that WORKING
returns the number of seconds you (likely) postponed the
message visibility for other workers (the command basically returns the
retry time of the job), so the worker should make sure to send the next
WORKING
command before this time elapses. Moreover, a worker that may want
to use this interface may fetch the retry value with the SHOW
command
when starting to process a message, or may simply send a WORKING
command
ASAP, like in the following example (in pseudo code):
retry = WORKING(jobid)
RESET timer
WHILE ... work with the job still not finished ...
IF timer reached 80% of the retry time
WORKING(jobid)
RESET timer
END
END
NACK <job-id> ... <job-id>
The NACK
command tells Disque to put the job back in the queue ASAP. It
is very similar to ENQUEUE
but it increments the job nacks
counter
instead of the additional-deliveries
counter. The command should be used
when the worker was not able to process a message and wants the message to
be put back into the queue in order to be processed again.
INFO
Generic server information / stats.
HELLO
Returns hello format version, this node ID, all the nodes IDs, IP addresses, ports, and priority (lower is better, means a node is more available). Clients should use this as a handshake command when connecting with a Disque node.
QLEN <queue-name>
Return the length of the queue.
QSTAT <queue-name>
Show information about a queue as an array of key-value pairs. Below is an example of the output, however, implementations should not rely on the order of the fields nor on the existence of the fields listed. They may be (unlikely) removed or more can be (likely) added in the future.
If a queue does not exist, NULL is returned. Note that queues are automatically evicted after some time if empty and without clients blocked waiting for jobs, even if there are active jobs for the queue. So the non existence of a queue does not mean there are not jobs in the node or in the whole cluster about this queue. The queue will be immediately created again when needed to serve requests.
Example output:
QSTAT foo
1) "name"
2) "foo"
3) "len"
4) (integer) 56520
5) "age"
6) (integer) 601
7) "idle"
8) (integer) 3
9) "blocked"
10) (integer) 50
11) "import-from"
12) 1) "dcb833cf8f42fbb7924d92335ff6d67d3cea6e3d"
2) "4377bdf656040a18d8caf4d9f409746f1f9e6396"
13) "import-rate"
14) (integer) 19243
15) "jobs-in"
16) (integer) 3462847
17) "jobs-out"
18) (integer) 3389522
19) "pause"
20) "none"
Most fields should be obvious. The import-from
field shows a list of node
IDs this node is importing jobs from, for this queue, in order to serve
clients requests. The import-rate
is the instantaneous amount of jos/sec
we import in order to handle our outgoing traffic (GETJOB commands).
blocked
is the number of clients blocked on this queue right now.
age
and idle
are reported in seconds. The jobs-in
and -out
counters are
incremented every time a job is enqueued or dequeued for any reason.
QPEEK <queue-name> <count>
Return, without consuming from the queue, count jobs. If count is positive the specified number of jobs are returned from the oldest to the newest (in the same best-effort FIFO order as GETJOB). If count is negative the commands changes behavior and shows the count newest jobs, from the newest from the oldest.
ENQUEUE <job-id> ... <job-id>
Queue jobs if not already queued.
DEQUEUE <job-id> ... <job-id>
Remove the job from the queue.
DELJOB <job-id> ... <job-id>
Completely delete a job from a node.
Note that this is similar to FASTACK
, but limited to a single node since
no DELJOB
cluster bus message is sent to other nodes.
SHOW <job-id>
Describe the job.
QSCAN [COUNT <count>] [BUSYLOOP] [MINLEN <len>] [MAXLEN <len>] [IMPORTRATE <rate>]
The command provides an interface to iterate all the existing queues in the local node, providing a cursor in the form of an integer that is passed to the next command invocation. During the first call, the cursor must be 0, in the next calls the cursor returned in the previous call is used in the next. The iterator guarantees to return all the elements but may return duplicated elements.
Options:
COUNT <count>
A hint about how much work to do per iteration.BUSYLOOP
Block and return all the elements in a busy loop.MINLEN <count>
Don't return elements with less than count
jobs queued.MAXLEN <count>
Don't return elements with more than count
jobs queued.IMPORTRATE <rate>
Only return elements with a job import rate (from other nodes) >=
rate
.The cursor argument can be in any place, the first non matching option that has valid cursor form of an unsigned number will be sensed as a valid cursor.
JSCAN [<cursor>] [COUNT <count>] [BUSYLOOP] [QUEUE <queue>] [STATE <state1> STATE <state2> ... STATE <stateN>] [REPLY all|id]
The command provides an interface to iterate all the existing jobs in the local node, providing a cursor in the form of an integer that is passed to the next command invocation. During the first call the cursor must be 0, in the next calls the cursor returned in the previous call is used in the next. The iterator guarantees to return all the elements but may return duplicated elements.
Options:
COUNT <count>
A hint about how much work to do per iteration.BUSYLOOP
Block and return all the elements in a busy loop.QUEUE <queue>
Return only jobs in the specified queue.STATE <state>
Return jobs in the specified state. Can be used multiple times for a logical OR.REPLY <type>
Job reply type. Type can be all
or id
. Default is to report just the job ID. If all
is specified the full job state is returned like for the SHOW command.The cursor argument can be in any place, the first non matching option that has valid cursor form of an unsigned number will be sensed as a valid cursor.
PAUSE <queue-name> option1 [option2 ... optionN]
Control the paused state of a queue, possibly broadcasting the command to other nodes in the cluster. Disque queues can be paused in both directions, input and output, or both. Pausing a queue makes it unavailable for input or output operations. Specifically:
A queue paused in input will have changed behavior in the following ways:
-PAUSED
error for queues paused in input.Basically a queue paused in input never creates new jobs for this queue, and never puts active jobs (jobs for which the node has a copy but are not currently queued) back in the queue, for all the time the queue is paused.
A queue paused in output instead will behave in the following way:
So a queue paused in output will stop acting as a source of messages for both local and non local clients.
The paused state can be set for each queue using the PAUSE command followed by options to specify how to change the paused state. Possible options are:
The command always returns the state of the queue after the execution of the specified options, so the return value is one of in, out, all, none.
Queues paused in input or output are never evicted to reclaim memory, even if they are empty and inactive for a long time, since otherwise the paused state would be forgotten.
For example, in order to block output for the queue myqueue
in all the
currently reachable nodes, the following command should be send to a single node:
PAUSE myqueue out bcast
To specify all is the same as to specify both in and out, so the two following forms are equivalent:
PAUSE myqueue in out
PAUSE myqueue all
To just get the current state use:
PAUSE myqueue state
"none"
In order to provide a coherent API, messages with at-most-once delivery semantics are still retained after being delivered a first time, and should be acknowledged like any other message. Of course, the acknowledge is not mandatory, since the message may be lost and there is no way for the receiver to get the same message again, since the message is associated with a retry value of 0.
In order to avoid non acknowledged messages with retry set to 0 from leaking into Disque and eating all the memory, when the Disque server memory is full and starts to evict, it does not just evict acknowledged messages, but also can evict non acknowledged messages having, at the same time, the following two properties:
In theory, acknowledging a job that will never be retried is a waste of time and resources, however this design has hidden advantages:
However, not acknowledging the job does not result in big issues since they are evicted eventually during memory pressure.
Adding nodes is trivial, and just consists in starting a new node and sending it
a CLUSTER MEET
command. Assuming the node you just started is located
at address 192.168.1.10 port 7714, and a random (you can use any) node of
the existing cluster is located at 192.168.1.9 port 7711, all you need to do
is:
./disque -h 192.168.1.10 -p 7714 cluster meet 192.168.1.9 7711
Note that you can invert the source and destination arguments and the new node will still join the cluster. It does not matter if it's the old node to meet the new one or the other way around.
In order to remove a node, it is possible to use the crude way of just
shutting it down, and then use CLUSTER FORGET <old-node-id>
in all the
other nodes in order to remove references to it from the configuration of
the other nodes. However this means that, for example, messages that had
a replication factor of 3, and one of the replicas was the node you are
shutting down, suddenly are left with just 2 replicas even if no actual
failure happened. Moreover if the node you are removing had messages in
queue, you'll need to wait the retry time before the messages will be
queued again. For all these reasons, Disque has a better way to remove nodes
which is described in the next section.
In order to empty a node of its content before removing it, it is possible to use a feature that puts a node in leaving state. To enable this feature just contact the node to remove, and use the following command:
CLUSTER LEAVING yes
The node will start advertising itself as leaving, so in a matter of seconds all the cluster will know (if there are partitions, when the partition heals all the nodes will eventually be informed), and this is what happens when the node is in this state:
ADDJOB
commands, it performs external replication, like when a node is near the memory limits. This means that it will make sure
to create the number of replicas of the message in the cluster without using itself as a replica. So no new messages are created in the context of a node which is leaving.-LEAVING
messages to all clients that use GETJOB
but would block waiting for jobs. The -LEAVING
error means the clients should connect to another node. Clients that were already blocked waiting for messages will be unblocked and a -LEAVING
error will be sent to them as well.NEEDJOBS
messages in the context of Disque federation, so it will never ask other nodes to transfer messages to it.HELLO
command output, so that clients will select a different node.ACKJOB
command about a job it does not know.All these behavior changes result in the node participating only as a source of messages, so eventually its message count will drop to zero (it is possible to check for this condition using INFO jobs
). When this happens the node can be stopped and removed from the other nodes tables using CLUSTER FORGET
as described in the section above.
Disque uses the same protocol as Redis itself. To adapt Redis clients, or to use them directly, should be pretty easy. However note that Disque's default port is 7711 and not 6379.
While a vanilla Redis client may work well with Disque, clients should optionally use the following protocol in order to connect with a Disque cluster:
HELLO
command should be used in order to retrieve the Node ID and other potentially useful information (server version, number of nodes).GETJOB
command, or other commands, may return a -LEAVING
error instead of blocking. This error should be considered by the client library as a request to connect to a different node, since the node it is connected to is not able to serve the request since it is leaving the cluster. Nodes in this state have a very high priority number published via HELLO
, so will be unlikely to be picked at the next connection attempt.This way producers and consumers will eventually try to minimize node message exchanges whenever possible.
So basically you could perform basic usage using just a Redis client, however there are already specialized client libraries implementing a more specialized API on top of Disque:
C++
Common Lisp
Elixir
Erlang
Go
Java
Node.js
Perl
PHP
Python
Ruby
Rust
.NET
Disque is a full mesh, with each node connected to each other. Disque performs distributed failure detection via gossip, only in order to adjust the replication strategy (try reachable nodes first when trying to replicate a message), and in order to inform clients about non reachable nodes when they want the list of nodes they can connect to.
As Disque is multi-master, the event of nodes failing is not handled in any special way.
Nodes communicate via a set of messages, using the node-to-node message bus. A few of the messages are used in order to check that other nodes are reachable and to mark nodes as failing. Those messages are PING, PONG and FAIL. Since failure detection is only used to adjust the replication strategy (talk with reachable nodes first in order to improve latency), the details are yet not described. Other messages are more important since they are used in order to replicate jobs, re-issue jobs while trying to minimize multiple deliveries, and in order to auto-federate to serve consumers when messages are produced in different nodes compared to where consumers are.
The following is a list of messages and what they do, split by category. Note that this is just an informal description, while in the next sections describing the Disque state machine, there is a more detailed description of the behavior caused by message reception, and in what cases they are generated.
NEEDJOBS(queue,count): The sender asks the receiver to obtain messages for a given queue, possibly count messages, but this is only an hit for congestion control and messages optimization, the receiver is free to reply with whatever number of messages. NEEDJOBS messages are delivered in two ways: broadcasted to every node in the cluster from time to time, in order to discover new source nodes for a given queue, or more often, to a set of nodes that recently replies with jobs for a given queue. This latter mechanism is called an ad hoc delivery, and is possible since every node remembers for some time the set of nodes that were recent providers of messages for a given queue. In both cases, NEEDJOBS messages are delivered with exponential delays, with the exception of queues that drop to zero-messages and have a positive recent import rate, in this case an ad hoc NEEDJOBS delivery is performed regardless of the last time the message was delivered in order to allow a continuous stream of messages under load.
YOURJOBS(array of messages): The reply to NEEDJOBS. An array of serialized jobs, usually all about the same queue (but future optimization may allow to send different jobs from different queues). Jobs into YOURJOBS replies are extracted from the local queue, and queued at the receiver node's queue with the same name. So even messages with a retry set to 0 (at most once delivery) still guarantee the safety rule since a given message may be in the source node, on the wire, or already received in the destination node. If a YOURJOBS message is lost, at least once delivery jobs will be re-queued later when the retry time is reached.
This section shows the most interesting (as in less obvious) parts of the state machine each Disque node implements. While practically it is a single state machine, it is split in sections. The state machine description uses a convention that is not standard but should look familiar, since it is event driven, made of actions performed upon: message receptions in the form of commands received from clients, messages received from other cluster nodes, timers, and procedure calls.
Note that: job
is a job object with the following fields:
job.delivered
: A list of nodes that may have this message. This list does not need to be complete, is used for best-effort algorithms.job.confirmed
: A list of nodes that confirmed reception of ACK by replying with a GOTJOB message.job.id
: The job 48 chars ID.job.state
: The job state among: wait-repl
, active
, queued
, acked
.job.replicate
: Replication factor for this job.job.qtime
: Time at which we need to re-queue the job.List fields such as .delivered
and .confirmed
support methods like .size
to get the number of elements.
States are as follows:
wait-repl
: the job is waiting to be synchronously replicated.active
: the job is active, either it reached the replication factor in the originating node, or it was created because the node received an REPLJOB
message from another node.queued
: the job is active and also is pending into a queue in this node.acked
: the job is no longer active since a client confirmed the reception using the ACKJOB
command or another Disque node sent a SETACK
message for the job.PROCEDURE LOOKUP-JOB(string job-id)
:
PROCEDURE UNREGISTER(object job)
:
PROCEDURE ENQUEUE(job)
:
job.state == queued
return ASAP.job
into job.queue
.job.state
to queued
.PROCEDURE DEQUEUE(job)
:
job.state != queued
return ASAP.job
from job.queue
.job.state
to active
.ON RECV cluster message: DELJOB(string job.id)
:
LOOKUP-JOB(job-id)
.job != NULL
THEN call UNREGISTER(job)
.This part of the state machine documents how clients add jobs to the cluster and how the cluster replicates jobs across different Disque nodes.
ON RECV client command `ADDJOB(string queue-name, string body, integer replicate, integer retry, integer ttl, ...):
wait-repl
state, having as body, ttl, retry, queue name, the specified values.replicate-1
nodes.Step 3: We'll reply to the client in step 4 of GOTJOB
message processing.
ON RECV cluster message REPLJOB(object serialized-job)
:
LOOKUP-JOB(serialized-job.id)
.job != NULL
THEN: job.delivered = UNION(job.delivered,serialized-job.delivered). Return ASAP, since we have the job.active
.GOTJOB(job.id)
.Step 1: We may already have the job, since REPLJOB may be duplicated.
Step 2: If we already have the same job, we update the list of jobs that may have a copy of this job, performing the union of the list of nodes we have with the list of nodes in the serialized job.
ON RECV cluster message GOTJOB(object serialized-job)
:
LOOKUP-JOB(serialized-job.id)
.job == NULL
OR job.state != wait-repl
Return ASAP.job.confirmed
.job.confirmed.size == job.replicate
THEN change job.state
to active
, call ENQUEUE(job), and reply to the blocked client with job.id
.Step 4: As we receive enough confirmations via GOTJOB
messages, we finally reach the replication factor required by the user and consider the message active.
TIMER, firing every next 50 milliseconds while a job still did not reached the expected replication factor.
job.delivered
, call it node
.node
to job.delivered
.job.delivered
.Step 3: We send the message to every node again, so that each node will have a chance to update job.delivered
with the new nodes. It is not required for each node to know the full list of nodes that may have a copy, but doing so improves our approximation of single delivery whenever possible.
This part of the state machine documents how Disque nodes put a given job back into the queue after the specified retry time elapsed without the job being acknowledged.
TIMER, firing 500 milliseconds before the retry time elapses:
WILLQUEUE(job.id)
to every node in jobs.delivered
.TIMER, firing when job.qtime
time is reached.
job.retry == 0
THEN return ASAP.job.qtime
to NOW + job.retry.QUEUED(job.id)
message to each node in job.delivered
.Step 1: At most once jobs never get enqueued again.
Step 3: We'll retry again after the retry period.
ON RECV cluster message WILLQUEUE(string job-id)
:
LOOKUP-JOB(job-id)
.job == NULL
THEN return ASAP.job.state == queued
SEND QUEUED(job.id)
to job.delivered
.job.state == acked
SEND SETACK(job.id)
to the sender.Step 3: We broadcast the message since likely the other nodes are going to retry as well.
Step 4: SETACK processing is documented below in the acknowledges section of the state machine description.
ON RECV cluster message QUEUED(string job-id)
:
LOOKUP-JOB(job-id)
.job == NULL
THEN return ASAP.job.state == acked
THEN return ASAP.job.state == queued
THEN if sender node ID is greater than my node ID call DEQUEUE(job).job.qtime
setting it to NOW + job.retry.Step 4: If multiple nodes re-queue the job about at the same time because of race conditions or network partitions that make WILLQUEUE
not effective, then QUEUED
forces receiving nodes to dequeue the message if the sender has a greater node ID, lowering the probability of unwanted multiple delivery.
Step 5: Now the message is already queued somewhere else, but the node will retry again after the retry time.
This part of the state machine is used in order to garbage collect acknowledged jobs, when a job finally gets acknowledged by a client.
PROCEDURE ACK-JOB(job)
:
acked
, do nothing and return ASAP.acked
, dequeue the job if queued, schedule first call to TIMER.PROCEDURE START-GC(job)
:
SETACK(job.delivered.size)
to each node that is listed in job.delivered
but is not listed in job.confirmed
.job.delivered.size == 0
, THEN send SETACK(0)
to every node in the cluster.Step 2: this is an ACK about a job we don’t know. In that case, we can just broadcast the acknowledged hoping somebody knows about the job and replies.
ON RECV client command ACKJOB(string job-id)
:
LOOKUP-JOB(job-id)
.NULL
, ignore the message and return.ACK-JOB(job)
.START-GC(job)
.ON RECV cluster message SETACK(string job-id, integer may-have)
:
LOOKUP-JOB(job-id)
.NULL
.job == NULL OR job.delivered.size <= may-have
.job != NULL
and jobs.delivered.size > may-have
THEN call START-GC(job)
.may-have == 0 AND job != NULL
, reply with GOTACK(1)
and call START-GC(job)
.Steps 3 and 4 makes sure that among the reachable nodes that may have a message, garbage collection will be performed by the node that is aware of more nodes that may have a copy.
Step 5 instead is used in order to start a GC attempt if we received a SETACK message from a node just hacking a dummy ACK (an acknowledge about a job it was not aware of).
ON RECV cluster message GOTACK(string job-id, bool known)
:
LOOKUP-JOB(job-id)
. Return ASAP IF job == NULL
.ACK-JOB(job)
.known == true AND job.delivered.size > 0
THEN add the sender node to job.delivered
.(known == true) OR (known == false AND job.delivered.size > 0) OR (known == false AND sender is an element of job.delivered)
THEN add the sender node to jobs.confirmed
.job.delivered.size > 0 AND job.delivered.size == job.confirmed.size
, THEN send DELJOB(job.id)
to every node in the job.delivered
list and call UNREGISTER(job)
.job.delivered == 0 AND known == true
, THEN call UNREGISTER(job)
.job.delivered == 0 AND job.confirmed.size == cluster.size
THEN call UNREGISTER(job)
.Step 3: If job.delivered.size
is zero, it means that the node just holds a dummy ack for the job. It means the node has an acknowledged job it created on the fly because a client acknowledged (via ACKJOB command) a job it was not aware of.
Step 6: we don't have to hold a dummy acknowledged jobs if there are nodes that have the job already acknowledged.
Step 7: this happens when nobody knows about a job, like when a client acknowledged a wrong job ID.
TIMER, from time to time (exponential backoff with random error), for every acknowledged job in memory:
START-GC(job)
.N
servers. There is a lot more message passing between nodes involved, and so forth. The good news is that being totally unoptimized, there is room for improvements.No, it is a standalone project, however a big part of the Redis networking source code, nodes message bus, libraries, and the client protocol, were reused in this new project. In theory it was possible to extract the common code and release it as a framework to write distributed systems in C. However this is not a perfect solution as well, since the projects are expected to diverge more and more in the future, and to rely on a common foundation was hard. Moreover the initial effort to turn Redis into two different layers: an abstract server, networking stack and cluster bus, and the actual Redis implementation, was a huge effort, ways bigger than writing Disque itself.
However while it is a separated project, conceptually Disque is related to Redis, since it tries to solve a Redis use case in a vertical, ad-hoc way.
Disque is a side project of Salvatore Sanfilippo, aka @antirez.
Currently I consider this just a public alpha: If I see people happy to use it for the right reasons (i.e. it is better in some use cases compared to other message queues) I'll continue the development. Otherwise it was anyway cool to develop it, I had much fun, and I definitely learned new things.
Yes. In Disque it should be relatively simple to use the disk when memory is not available, since jobs are immutable and don't need to necessarily exist in memory at a given time.
There are multiple strategies available. The current idea is that when an instance is out of memory, jobs are stored into a log file instead of memory. As more free memory is available in the instance, on disk jobs are loaded.
However in order to implement this, there is to observe strong evidence of its general usefulness for the user base.
Disque routing is not static, the cluster automatically tries to provide messages to nodes where consumers are attached. When there is an high enough traffic (even one message per second is enough) nodes remember other nodes that recently were sources for jobs in a given queue, so it is possible to aggressively send messages asking for more jobs, every time there are consumers waiting for more messages and the local queue is empty.
However when the traffic is very low, informations about recent sources of messages are discarded, and nodes rely on a more generic mechanism in order to discover other nodes that may have messages in the queues we need them (which is also used in high traffic conditions as well, in order to discover new sources of messages for a given queue).
For example imagine a setup with two nodes, A and B.
myqueue
. Node A has no jobs enqueued, so the client is blocked.myqueue
, but sending them to node B.During step 1
if there was no recent traffic of imported messages for this queue, node A has no idea about who may have messages for the queue myqueue
. Every other node may have, or none may have. So it starts to broadcast NEEDJOBS
messages to the whole cluster. However we can't spam the cluster with messages, so if no reply is received after the first broadcast, the next will be sent with a larger delay, and so foth. The delay is exponential, with a maximum value of 30 seconds (this parameters will be configurable in the future, likely).
When there is some traffic instead, nodes send NEEDJOBS
messages ASAP to other nodes that were recent sources of messages. Even when no reply is received, the next NEEDJOBS
messages will be sent more aggressively to the subset of nodes that had messages in the past, with a delay that starts at 25 milliseconds and has a maximum value of two seconds.
In order to minimize the latency, NEEDJOBS
messages are not throttled at all when:
For more information, please refer to the file queue.c
, especially the function needJobsForQueue
and its callers.
Messages are put into the queue according to their creation time attribute. This means that they are enqueued in a best effort order in the local node queue. Messages that need to be put back into the queue again because their delivery failed are usually (but not always) older than messages already in queue, so they'll likely be among the first to be delivered to workers.
DIStributed QUEue but is also a joke with "dis" as negation (like in disorder) of the strict concept of queue, since Disque is not able to guarantee the strict ordering you expect from something called queue. And because of this tradeof it gains many other interesting things.
Get in touch with us in one of the following ways:
disque
tag. This is the preferred method to get general help about Disque: other users will easily find previous questions so we can incrementally build a knowledge base.#disque
IRC channel at irc.freenode.net.I would like to say thank you to the following persons and companies.