closeio / tasktiger

Python task queue using Redis
MIT License
1.41k stars 80 forks source link
queue worker

========= TaskTiger

.. image:: https://github.com/closeio/tasktiger/actions/workflows/test.yaml/badge.svg?event=push :target: https://github.com/closeio/tasktiger/actions/workflows/test.yaml

TaskTiger is a Python task queue using Redis.

(Interested in working on projects like this? Close is looking for great engineers to join our team)

.. _Close: http://close.com .. _great engineers: http://jobs.close.com

.. contents:: Contents

Features

.. _tasktiger-admin: https://github.com/closeio/tasktiger-admin

Quick start

It is easy to get started with TaskTiger.

Create a file that contains the task(s).

.. code:: python

tasks.py

def my_task(): print('Hello')

Queue the task using the delay method.

.. code:: python

In [1]: import tasktiger, tasks In [2]: tiger = tasktiger.TaskTiger() In [3]: tiger.delay(tasks.my_task)

Run a worker (make sure the task code can be found, e.g. using PYTHONPATH).

.. code:: bash

% PYTHONPATH=. tasktiger {"timestamp": "2015-08-27T21:00:09.135344Z", "queues": null, "pid": 69840, "event": "ready", "level": "info"} {"task_id": "6fa07a91642363593cddef7a9e0c70ae3480921231710aa7648b467e637baa79", "level": "debug", "timestamp": "2015-08-27T21:03:56.727051Z", "pid": 69840, "queue": "default", "child_pid": 70171, "event": "processing"} Hello {"task_id": "6fa07a91642363593cddef7a9e0c70ae3480921231710aa7648b467e637baa79", "level": "debug", "timestamp": "2015-08-27T21:03:56.732457Z", "pid": 69840, "queue": "default", "event": "done"}

Configuration

A TaskTiger object keeps track of TaskTiger's settings and is used to decorate and queue tasks. The constructor takes the following arguments:

Example:

.. code:: python

import tasktiger from redis import Redis conn = Redis(db=1, decode_responses=True) tiger = tasktiger.TaskTiger(connection=conn, config={ 'BATCH_QUEUES': {

Batch up to 50 tasks that are queued in the my_batch_queue or any

      # of its subqueues, except for the send_email subqueue which only
      # processes up to 10 tasks at a time.
      'my_batch_queue': 50,
      'my_batch_queue.send_email': 10,
  },

})

Task decorator

TaskTiger provides a task decorator to specify task options. Note that simple tasks don't need to be decorated. However, decorating the task allows you to use an alternative syntax to queue the task, which is compatible with Celery:

.. code:: python

tasks.py

import tasktiger tiger = tasktiger.TaskTiger()

@tiger.task() def my_task(name, n=None): print('Hello', name)

.. code:: python

In [1]: import tasks

The following are equivalent. However, the second syntax can only be used

if the task is decorated.

In [2]: tasks.tiger.delay(my_task, args=('John',), kwargs={'n': 1}) In [3]: tasks.my_task.delay('John', n=1)

Task options

Tasks support a variety of options that can be specified either in the task decorator, or when queueing a task. For the latter, the delay method must be called on the TaskTiger object, and any options in the task decorator are overridden.

.. code:: python

@tiger.task(queue='myqueue', unique=True) def my_task(): print('Hello')

.. code:: python

The task will be queued in "otherqueue", even though the task decorator

says "myqueue".

tiger.delay(my_task, queue='otherqueue')

When queueing a task, the task needs to be defined in a module other than the Python file which is being executed. In other words, the task can't be in the __main__ module. TaskTiger will give you back an error otherwise.

The following options are supported by both delay and the task decorator:

The following options can be only specified in the task decorator:

Custom retrying

In some cases the task retry options may not be flexible enough. For example, you might want to use a different retry method depending on the exception type, or you might want to like to suppress logging an error if a task fails after retries. In these cases, RetryException can be raised within the task function. The following options are supported:

Example usage:

.. code:: python

from tasktiger.exceptions import RetryException from tasktiger.retry import exponential, fixed

def my_task(): if not ready():

Retry every minute up to 3 times if we're not ready. An error will

      # be logged if we're out of retries.
      raise RetryException(method=fixed(60, 3))

  try:
      some_code()
  except NetworkException:
      # Back off exponentially up to 5 times in case of a network failure.
      # Log the original traceback (as a warning) and don't log an error if
      # we still fail after 5 times.
      raise RetryException(method=exponential(60, 2, 5),
                           original_traceback=True,
                           log_error=False)

Workers

The tasktiger command is used on the command line to invoke a worker. To invoke multiple workers, multiple instances need to be started. This can be easily done e.g. via Supervisor. The following Supervisor configuration file can be placed in /etc/supervisor/tasktiger.ini and runs 4 TaskTiger workers as the ubuntu user. For more information, read Supervisor's documentation.

.. code:: bash

[program:tasktiger] command=/usr/local/bin/tasktiger process_name=%(programname)s%(process_num)02d numprocs=4 numprocs_start=0 priority=999 autostart=true autorestart=true startsecs=10 startretries=3 exitcodes=0,2 stopsignal=TERM stopwaitsecs=600 killasgroup=false user=ubuntu redirect_stderr=false stdout_logfile=/var/log/tasktiger.out.log stdout_logfile_maxbytes=250MB stdout_logfile_backups=10 stderr_logfile=/var/log/tasktiger.err.log stderr_logfile_maxbytes=250MB stderr_logfile_backups=10

Workers support the following options:

In some cases it is convenient to have a custom TaskTiger launch script. For example, your application may have a manage.py command that sets up the environment and you may want to launch TaskTiger workers using that script. To do that, you can use the run_worker_with_args method, which launches a TaskTiger worker and parses any command line arguments. Here is an example:

.. code:: python

import sys from tasktiger import TaskTiger

try: command = sys.argv[1] except IndexError: command = None

if command == 'tasktiger': tiger = TaskTiger(setup_structlog=True)

Strip the "tasktiger" arg when running via manage, so we can run e.g.

  # ./manage.py tasktiger --help
  tiger.run_worker_with_args(sys.argv[2:])
  sys.exit(0)

Inspect, requeue and delete tasks

TaskTiger provides access to the Task class which lets you inspect queues and perform various actions on tasks.

Each queue can have tasks in the following states:

To get a list of all tasks for a given queue and state, use Task.tasks_from_queue. The method gives you back a tuple containing the total number of tasks in the queue (useful if the tasks are truncated) and a list of tasks in the queue, latest first. Using the skip and limit keyword arguments, you can fetch arbitrary slices of the queue. If you know the task ID, you can fetch a given task using Task.from_id. Both methods let you load tracebacks from failed task executions using the load_executions keyword argument, which accepts an integer indicating how many executions should be loaded.

Tasks can also be constructed and queued using the regular constructor, which takes the TaskTiger instance, the function name and the options described in the Task options section. The task can then be queued using its delay method. Note that the when argument needs to be passed to the delay method, if applicable. Unique tasks can be reconstructed using the same arguments.

The Task object has the following properties:

The Task object has the following methods:

The current task can be accessed within the task function while it's being executed: In case of a non-batch task, the current_task property of the TaskTiger instance returns the current Task instance. In case of a batch task the current_tasks property must be used which returns a list of tasks that are currently being processed (in the same order as they were passed to the task).

Example 1: Queueing a unique task and canceling it without a reference to the original task.

.. code:: python

from tasktiger import TaskTiger, Task

tiger = TaskTiger()

Send an email in five minutes.

task = Task(tiger, send_mail, args=['email_id'], unique=True) task.delay(when=datetime.timedelta(minutes=5))

Unique tasks get back a task instance referring to the same task by simply

creating the same task again.

task = Task(tiger, send_mail, args=['email_id'], unique=True) task.cancel()

Example 2: Inspecting queues and retrying a task by ID.

.. code:: python

from tasktiger import TaskTiger, Task

QUEUE_NAME = 'default' TASK_STATE = 'error' TASK_ID = '6fa07a91642363593cddef7a9e0c70ae3480921231710aa7648b467e637baa79'

tiger = TaskTiger()

n_total, tasks = Task.tasks_from_queue(tiger, QUEUE_NAME, TASK_STATE)

for task in tasks: print(task.id, task.func)

task = Task.from_id(tiger, QUEUE_NAME, TASK_STATE, TASK_ID) task.retry()

Example 3: Accessing the task instances within a batch task function to determine how many times the currently processing tasks were previously executed.

.. code:: python

from tasktiger import TaskTiger

tiger = TaskTiger()

@tiger.task(batch=True) def my_task(args): for task in tiger.current_tasks: print(task.n_executions())

Pause queue processing

The --max-workers-per-queue option uses queue locks to control the number of workers that can simultaneously process the same queue. When using this option a system lock can be placed on a queue which will keep workers from processing tasks from that queue until it expires. Use the set_queue_system_lock() method of the TaskTiger object to set this lock.

Rollbar error handling

TaskTiger comes with Rollbar integration for error handling. When a task errors out, it can be logged to Rollbar, grouped by queue, task function name and exception type. To enable logging, initialize rollbar with the StructlogRollbarHandler provided in the tasktiger.rollbar module. The handler takes a string as an argument which is used to prefix all the messages reported to Rollbar. Here is a custom worker launch script:

.. code:: python

import logging import rollbar import sys from tasktiger import TaskTiger from tasktiger.rollbar import StructlogRollbarHandler

tiger = TaskTiger(setup_structlog=True)

rollbar.init(ROLLBAR_API_KEY, APPLICATION_ENVIRONMENT, allow_logging_basic_config=False) rollbar_handler = StructlogRollbarHandler('TaskTiger') rollbar_handler.setLevel(logging.ERROR) tiger.log.addHandler(rollbar_handler)

tiger.run_worker_with_args(sys.argv[1:])

Cleaning Up Error'd Tasks

Error'd tasks occasionally need to be purged from Redis, so TaskTiger exposes a purge_errored_tasks method to help. It might be useful to set this up as a periodic task as follows:

.. code:: python

from tasktiger import TaskTiger, periodic

tiger = TaskTiger()

@tiger.task(schedule=periodic(hours=1)) def purge_errored_tasks(): tiger.purge_errored_tasks( limit=1000, last_execution_before=( datetime.datetime.utcnow() - datetime.timedelta(weeks=12) ) )

Running The Test Suite

Tests can be run locally using the provided docker compose file. After installing docker, tests should be runnable with:

.. code :: bash

docker-compose run --rm tasktiger pytest

Tests can be more granularly run using normal pytest flags. For example:

.. code :: bash

docker-compose run --rm tasktiger pytest tests/test_base.py::TestCase

Releasing a New Version

. Make sure the code has been thoroughly reviewed and tested in a realistic production environment.

. Update setup.py and CHANGELOG.md. Make sure you include any breaking changes.

. Run python setup.py sdist and twine upload dist/<PACKAGE_TO_UPLOAD>.

. Push a new tag pointing to the released commit, format: v0.13 for example.

. Mark the tag as a release in GitHub's UI and include in the description the changelog entry for the version.

An example would be: https://github.com/closeio/tasktiger/releases/tag/v0.13.