ZQ
ZQ – is a multi-producer-single-consumer durable queue based on the PostgreSQL.
All operations are performed by calling stored procedures, so it is a
reasonable choice if you already have a PostgreSQL in your zoo and need a
simple queue solution.
Inspired by pgq.
Way of work
In nutshell work with queue (after it was created) is as simple as:
- Enqueue data
- Open batch of desired size
- Consume data
- Close or cancel batch
- If batch was closed (for instance in case of successful data processing):
- data will be removed from queue
- next batch will be started from item followed by last item of closed batch
- If batch was canceled (like in case of temporary impediments of data processing):
- data stays in the queue
- next batch will be started from same item
PostgresSQL API
Queues management
Data management
-
Enqueue data: zq.enqueue(queue_name TEXT, data TEXT[]) : VOID
- Inserts items from data array as events in the queue 'queue_name' preserving order
- Throws an exception if given queue is not exists
-
Consume data: zq.dequeue(queue_name TEXT) : TABLE
- Returns content of the current batch for the queue "queue_name" a table with
following columns:
"timestamp" TIMESTAMP
"data" TEXT
- Throws an exception if given queue is not exists
- Throws an exception if there is batch was not created for given queue
Batch management
-
Create new batch: zq.open_batch(queue_name TEXT, max_batch_size INT) : INT
- Creates new batch of maximum size "max_batch_size" for the queue "queue_name",
if there are new events
- Returns actual size of the newly created batch (<="max_batch_size") or 0
if no new events are available and batch was not created
- Throws an exception if given queue is not exists
- Throws an exception if "max_batch_size" is a non-positive integer
- Throws an exception if non-committed batch for given queue is already exists
-
Commit current batch: zq.close_batch(queue_name TEXT) : VOID
- Commits previously created batch for the queue "queue_name"
- Throws an exception if given queue is not exists
- Throws an exception if there is no batch was created for given queue
-
Cancel current batch: zq.cancel_batch(queue_name TEXT) : VOID
- Cancel previously created batch for the queue "queue_name"
- Throws an exception if given queue is not exists
- Throws an exception if there is no batch was created for given queue