RQ (Redis Queue) is a simple Clojure package for queueing jobs and processing them in the background with workers. It is backed by Redis and it is designed to have a low barrier to entry, inspired by python-rq.
"simple is better than complex" - The Zen of Python
clj-rq arose from the need to simplify the use of the redis queue feature used in the SaaS version of moclojer, here's a more detailed explanation of the motivation (a thread that generated controversy)
We distribute the library via Clojars.
com.moclojer/rq {:mvn/version "0.x.x"}
[com.moclojer/rq "0.x.x"]
see the versions distributed on clojars
We build Jedis ourselves to enable building queue functions directly using reflection. This approach ensures full compatibility with our library's features.
git clone [repository URL]
cd clj-rq
make jedis
After running make jedis
, the library will be built and ready to be linked with your project. Linking in this context means that the built Jedis library will be properly referenced and used by clj-rq when you include it in your project.
The clj-rq
library leverages the ->wrap-method
macro to dynamically generate queue functions by wrapping methods from the Jedis library. This approach ensures that the library is always up-to-date with the latest changes in Jedis, providing enhanced security and compatibility.
The ->wrap-method
macro is defined in src/com/moclojer/internal/reflection.clj
and is used in src/com/moclojer/rq/queue.clj
to generate the queue functions. By using reflection, the library can dynamically adapt to changes in the Jedis API, ensuring that the generated functions are always in sync with the underlying Jedis methods.
This dynamic generation process is a key differentiator of the clj-rq
library, making it more secure and future-proof compared to other libraries that rely on static function definitions.
This section outlines the key functions available in the clj-rq library, covering both queue and pub/sub operations. For detailed descriptions and examples of each function, please refer to the specific subsections below.
The clj-rq
library provides a set of queue functions that are dynamically generated by wrapping methods from the Jedis library. These functions are defined in src/com/moclojer/rq/queue.clj
and include:
push!
: Adds elements to the queue.pop!
: Removes and returns elements from the queue.bpop!
: Blocks until an element is available to pop from the queue.index
: Retrieves an element at a specific index in the queue.range
: Retrieves a range of elements from the queue.set!
: Sets the value of an element at a specific index in the queue.len
: Returns the length of the queue.rem!
: Removes elements from the queue.insert!
: Inserts an element into the queue at a specific position.trim!
: Trims the queue to a specified range.All these functions share common options, such as specifying the queue name and handling encoding/decoding of messages. The options are passed as arguments to the functions and allow for flexible configuration.
[!WARNING] The element or elements to be pushed into a queue has to be passed inside a sequentiable (a vector for example).
(rq-queue/push! client "my-queue" ["message"] {:direction :left})
(rq-queue/pop! client "my-queue" {:direction :right})
(rq-queue/bpop! client "my-queue" {:timeout 5})
(rq-queue/index client "my-queue" 0)
(rq-queue/range client "my-queue" 0 -1)
(rq-queue/set! client "my-queue" 0 "new-message")
len: This function returns the length of the queue. It is useful for monitoring the size of the queue.
(rq-queue/len client "my-queue")
rem!: This function removes elements from the queue based on a specified pattern. It supports options for specifying the number of elements to remove.
(rq-queue/rem! client "my-queue" "message" {:count 2})
(rq-queue/insert! client "my-queue" "pivot-message" "new-message" {:position :before})
(rq-queue/trim! client "my-queue" 0 10)
The clj-rq
library provides a set of pub/sub functions that facilitate message publishing and subscription in a Redis-backed system. These functions are defined in src/com/moclojer/rq/pubsub.clj
and include:
publish!
: Publishes a message to a specified channel.group-handlers-by-channel
: Groups message handlers by their associated channels.create-listener
: Creates a listener that processes messages from subscribed channels.unarquive-channel!
: Unarchives a channel, making it active again.pack-workers-channels
: Packs worker channels into a format suitable for processing.subscribe!
: Subscribes to one or more channels and processes incoming messages.(rq-pubsub/publish! client "my-channel" "Hello, World!")
(rq-pubsub/subscribe! client ["my-channel"] handlers)
(rq-pubsub/unarquive-channel! client "my-channel")
(ns rq.example
(:require [com.moclojer.rq :as rq]
[com.moclojer.rq.queue :as queue]
[com.moclojer.rq.pubsub :as pubsub]))
(def *redis-pool* (rq/create-client "redis://localhost:6379/0"))
;; queue
(queue/push! *redis-pool* "my-queue"
;; has to be an array of the elements to push
[{:now (java.time.LocalDateTime/now)
:foo "bar"}])
(println :size (queue/len *redis-pool* "my-queue"))
(prn :popped (queue/pop! *redis-pool* "my-queue"))
;; pub/sub
(def my-workers
[{:channel "my-channel"
:handler (fn [msg]
(prn :msg :my-channel msg))}
{:channel "my-other-channel"
:handler (fn [{:keys [my data hello]}]
(my-function my data hello))}])
(pubsub/subscribe! *redis-pool* my-workers)
(pubsub/publish! *redis-pool* "my-channel" "hello world")
(pubsub/publish! *redis-pool* "my-other-channel"
{:my "moclojer team"
:data "app.moclojer.com"
:hello "maybe you'll like this website"})
(rq/close-client *redis-pool*)
The workflow in the given example can be represented as follows:
sequenceDiagram
participant User
participant Client
participant Queue
participant PubSub
participant Logger
User->>Client: create-client URL
Client-->>Logger: log client creation
Client-->>User: return client
User->>Queue: push! message
Queue-->>Logger: log push message
Queue-->>Queue: push message to queue
User->>PubSub: publish! channel, message
PubSub-->>Logger: log publish message
PubSub-->>PubSub: publish message to channel
User->>Queue: pop! queue-name
Queue-->>Logger: log pop operation
Queue-->>User: return popped message
User->>Client: close-client client
Client-->>Logger: log closing client
Client-->>User: confirm client closure
Made with 💜 by moclojer.