peter-wangxu / persist-queue

A thread-safe disk based persistent queue in Python
BSD 3-Clause "New" or "Revised" License
328 stars 48 forks source link
mysql persistent-queue python sqlite thread-safety

persist-queue - A thread-safe, disk-based queue for Python

.. image:: :target:

.. image:: :target:

.. image:: :target:

.. image:: :target:

.. image:: :alt: PyPI - Python Version

persist-queue implements a file-based queue and a serial of sqlite3-based queues. The goals is to achieve following requirements:

While queuelib and python-pqueue cannot fulfil all of above. After some try, I found it's hard to achieve based on their current implementation without huge code change. this is the motivation to start this project.

By default, persist-queue use pickle object serialization module to support object instances. Most built-in type, like int, dict, list are able to be persisted by persist-queue directly, to support customized objects, please refer to Pickling and unpickling extension types(Python2) <> and Pickling Class Instances(Python3) <>

This project is based on the achievements of python-pqueue <> and queuelib <>

Slack channels ^^^^^^^^^^^^^^

Join persist-queue <https://join.slack .com/t/persist-queue/shared_invite /enQtOTM0MDgzNTQ0MDg3LTNmN2IzYjQ1MDc0MDYzMjI4OGJmNmVkNWE3ZDBjYzg5MDc0OWUzZDJkYTkwODdkZmYwODdjNjUzMTk3MWExNDE>_ channel





from pypi ^^^^^^^^^

.. code-block:: console

pip install persist-queue
# for msgpack, cbor and mysql support, use following command
pip install "persist-queue[extra]"

from source code ^^^^^^^^^^^^^^^^

.. code-block:: console

git clone
cd persist-queue
# for msgpack and cbor support, run 'pip install -r extra-requirements.txt' first
python install


Here are the time spent(in seconds) for writing/reading 1000 items to the disk comparing the sqlite3 and file queue.

+---------------+---------+-------------------------+----------------------------+ | | Write | Write/Read(1 task_done) | Write/Read(many task_done) | +---------------+---------+-------------------------+----------------------------+ | SQLite3 Queue | 1.8880 | 2.0290 | 3.5940 | +---------------+---------+-------------------------+----------------------------+ | File Queue | 4.9520 | 5.0560 | 8.4900 | +---------------+---------+-------------------------+----------------------------+

windows note Performance of Windows File Queue has dramatic improvement since v0.4.1 due to the atomic renaming support(3-4X faster)

+---------------+--------+-------------------------+----------------------------+ | | Write | Write/Read(1 task_done) | Write/Read(many task_done) | +---------------+--------+-------------------------+----------------------------+ | SQLite3 Queue | 1.8282 | 1.8075 | 2.8639 | +---------------+--------+-------------------------+----------------------------+ | File Queue | 0.9123 | 1.0411 | 2.5104 | +---------------+--------+-------------------------+----------------------------+

+---------------+--------+-------------------------+----------------------------+ | | Write | Write/Read(1 task_done) | Write/Read(many task_done) | +---------------+--------+-------------------------+----------------------------+ | SQLite3 Queue | 0.1879 | 0.2115 | 0.3147 | +---------------+--------+-------------------------+----------------------------+ | File Queue | 0.5158 | 0.5357 | 1.0446 | +---------------+--------+-------------------------+----------------------------+


.. code-block:: console

python benchmark/ 1000

To see the real performance on your host, run the script under benchmark/

.. code-block:: console

python benchmark/ <COUNT, default to 100>


Example usage with a SQLite3 based queue ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code-block:: python

>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
>>> q.put('str1')
>>> q.put('str2')
>>> q.put('str3')
>>> q.get()
>>> del q

Close the console, and then recreate the queue:

.. code-block:: python

import persistqueue q = persistqueue.SQLiteQueue('mypath', auto_commit=True) q.get() 'str2'

New functions: Available since v0.8.0

Example usage of SQLite3 based UniqueQ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ This queue does not allow duplicate items.

.. code-block:: python

import persistqueue q = persistqueue.UniqueQ('mypath') q.put('str1') q.put('str1') q.size 1 q.put('str2') q.size 2

Example usage of SQLite3 based SQLiteAckQueue/UniqueAckQ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The core functions:

.. code-block:: python

import persistqueue ackq = persistqueue.SQLiteAckQueue('path') ackq.put('str1') item = ackq.get()

Do something with the item

ackq.ack(item) # If done with the item ackq.nack(item) # Else mark item as nack so that it can be proceeded again by any worker ackq.ack_failed(item) # Or else mark item as ack_failed to discard this item


raw example:

.. code-block:: python

q.put('val1') d = q.get(raw=True) print(d) {'pqid': 1, 'data': 'val1', 'timestamp': 1616719225.012912} q.ack(d)

next_in_order example:

.. code-block:: python

q.put("val1") q.put("val2") q.put("val3") item = q.get() id = q.nack(item) item = q.get(id=id, next_in_order=True) print(item) val2


  1. The SQLiteAckQueue always uses "auto_commit=True".
  2. The Queue could be set in non-block style, e.g. "SQLiteAckQueue.get(block=False, timeout=5)".
  3. UniqueAckQ only allows for unique items

Example usage with a file based queue ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


.. code-block:: python

>>> from persistqueue import Queue
>>> q = Queue("mypath")
>>> q.put('a')
>>> q.put('b')
>>> q.put('c')
>>> q.get()
>>> q.task_done()

Close the python console, and then we restart the queue from the same path,

.. code-block:: python

>>> from persistqueue import Queue
>>> q = Queue('mypath')
>>> q.get()
>>> q.task_done()

Example usage with an auto-saving file based queue ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Available since: v0.5.0

By default, items added to the queue are persisted during the put() call, and items removed from a queue are only persisted when task_done() is called.

.. code-block:: python

>>> from persistqueue import Queue
>>> q = Queue("mypath")
>>> q.put('a')
>>> q.put('b')
>>> q.get()
>>> q.get()

After exiting and restarting the queue from the same path, we see the items remain in the queue, because task_done() wasn't called before.

.. code-block:: python

>>> from persistqueue import Queue
>>> q = Queue('mypath')
>>> q.get()
>>> q.get()

This can be advantageous. For example, if your program crashes before finishing processing an item, it will remain in the queue after restarting. You can also spread out the task_done() calls for performance reasons to avoid lots of individual writes.

Using autosave=True on a file based queue will automatically save on every call to get(). Calling task_done() is not necessary, but may still be used to join() against the queue.

.. code-block:: python

>>> from persistqueue import Queue
>>> q = Queue("mypath", autosave=True)
>>> q.put('a')
>>> q.put('b')
>>> q.get()

After exiting and restarting the queue from the same path, only the second item remains:

.. code-block:: python

>>> from persistqueue import Queue
>>> q = Queue('mypath', autosave=True)
>>> q.get()

Example usage with a SQLite3 based dict ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code-block:: python

>>> from persisitqueue import PDict
>>> q = PDict("testpath", "testname")
>>> q['key1'] = 123
>>> q['key2'] = 321
>>> q['key1']
>>> len(q)
>>> del q['key1']
>>> q['key1']
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "persistqueue\", line 58, in __getitem__
    raise KeyError('Key: {} not exists.'.format(item))
KeyError: 'Key: key1 not exists.'

Close the console and restart the PDict

.. code-block:: python

>>> from persisitqueue import PDict
>>> q = PDict("testpath", "testname")
>>> q['key2']

Multi-thread usage for SQLite3 based queue ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code-block:: python

from persistqueue import FIFOSQLiteQueue

q = FIFOSQLiteQueue(path="./test", multithreading=True)

def worker():
    while True:
        item = q.get()

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True

for item in source():

multi-thread usage for Queue ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code-block:: python

from persistqueue import Queue

q = Queue()

def worker():
    while True:
        item = q.get()

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True

for item in source():

q.join()       # block until all tasks are done

Example usage with a MySQL based queue ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Available since: v0.8.0

.. code-block:: python

>>> import persistqueue
>>> db_conf = {
>>>     "host": "",
>>>     "user": "user",
>>>     "passwd": "passw0rd",
>>>     "db_name": "testqueue",
>>>     # "name": "",
>>>     "port": 3306
>>> }
>>> q = persistqueue.MySQLQueue(name="testtable", **db_conf)
>>> q.put('str1')
>>> q.put('str2')
>>> q.put('str3')
>>> q.get()
>>> del q

Close the console, and then recreate the queue:

.. code-block:: python

import persistqueue q = persistqueue.MySQLQueue(name="testtable", **db_conf) q.get() 'str2'


Due to the limitation of file queue described in issue #89 <>_, task_done in one thread may acknowledge items in other threads which should not be. Considering the SQLiteAckQueue if you have such requirement.

Serialization via msgpack/cbor/json ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code-block:: python

>>> from persistqueue
>>> q = persistqueue.Queue('mypath', serializer=persistqueue.serializers.msgpack)
>>> # via cbor2
>>> # q = persistqueue.Queue('mypath', serializer=persistqueue.serializers.cbor2)
>>> # via json
>>> # q = Queue('mypath', serializer=persistqueue.serializers.json)
>>> q.get()
>>> q.task_done()

Explicit resource reclaim ^^^^^^^^^^^^^^^^^^^^^^^^^

For some reasons, an application may require explicit reclamation for file handles or sql connections before end of execution. In these cases, user can simply call: .. code-block:: python

q = Queue() # or q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
del q

to reclaim related file handles or sql connections.


task_done is required both for file based queue and SQLite3 based queue (when auto_commit=False) to persist the cursor of next get to the disk.

Performance impact


persist-queue use tox to trigger tests.

.. code-block:: console


Available <PYTHON_VERSION>: py27, py34, py35, py36, py37

.. code-block:: console

tox -e pep8

pyenv <>_ is usually a helpful tool to manage multiple versions of Python.


Currently, the atomic operation is supported on Windows while still in experimental, That's saying, the data in persistqueue.Queue could be in unreadable state when an incidental failure occurs during Queue.task_done.

DO NOT put any critical data on persistqueue.queue on Windows.


Simply fork this repo and send PR for your code change(also tests to cover your change), remember to give a title and description of your PR. I am willing to enhance this project with you :).




Contributors <>_


persistqueue open 2 connections for the db if multithreading=True, the SQLite database is locked until that transaction is committed. The timeout parameter specifies how long the connection should wait for the lock to go away until raising an exception. Default time is 10, increase timeout when creating the queue if above error occurs.

The sqlite3 queues are heavily tested under multi-threading environment, if you find it's not thread-safe, please make sure you set the multithreading=True when initializing the queue before submitting new issue:).