just-work / django-celery-rpc

Remote access from one system to models of another one using Celery machinery.
The Unlicense
33 stars 13 forks source link

django-celery-rpc

Build Status codecov PyPI version

Remote access from one system to models and functions of other one using Celery machinery.

Relies on three outstanding python projects:

Main features

Client and server are designed to:

Installation

Install client:

pip install djangoceleryrpc

Install server:

pip install djangoceleryrpc[server]

Basic Configuration

Default configuration of django-celery-rpc must be overridden in settings.py by CELERY_RPC_CONFIG. The CELERY_RPC_CONFIG is a dict which must contains at least two keys: BROKER_URL and CELERY_RESULT_BACKEND. Any Celery config params also permitted (see Configuration and defaults)

server span

setting.py:

# minimal required configuration
CELERY_RPC_CONFIG = {
    'broker_url': 'amqp://guest:guest@rabbitmq:5672//',
    'result_backend': 'redis://redis:6379/0',
}

server eggs

setting.py:

# alternate request queue and routing key
CELERY_RPC_CONFIG = {
    'broker_url': 'amqp://guest:guest@rabbitmq:5672/',
    'result_backend': 'amqp://guest:guest@rabbitmq:5672/',
    'task_default_queue': 'celery_rpc.requests.alter_queue',
    'task_default_routing_key': 'celery_rpc.alter_routing_key'
}

client

setting.py:

# this settings will be used in clients by default
CELERY_RPC_CONFIG = {
    'broker_url': 'amqp://guest:guest@rabbitmq:5672/',
    'result_backend': 'redis://redis:6379/0',
}

# 'eggs' alternative configuration will be explicitly passed to the client constructor
CELERY_RPC_EGGS_CLIENT = {
    # BROKER_URL will be used by default from section above
    'result_backend': 'amqp://guest:guest@rabbitmq:5672/',
    'task_default_queue': 'celery_rpc.requests.alter_queue',
    'task_default_routing_key': 'celery_rpc.alter_routing_key'
}

*Note:

  1. client and server must share the same BROKER_URL, RESULT_BACKEND, DEFAULT_EXCHANGE, DEFAULT_QUEUE, DEFAULT_ROUTING_KEY
  2. different server must serve different request queues with different routing keys or must work with different exchanges*

example.py

from celery_rpc.client import Client
from django.conf import settings

# create client with default settings
span_client = Client()

# create client for `eggs` server
eggs_client = Client(CELERY_RPC_EGGS_CLIENT)

Using client

You can find more examples in tests.

Filtering

Simple filtering example

span_client.filter('app.models:MyModel', kwargs=dict(filters={'a__exact':'a'}))

Filtering with Q object

from django.db.models import Q
span_client.filter('app.models:MyModel', kwargs=dict(filters_Q=(Q(a='1') | Q(b='1')))

Also, we can use both Q and lookups

span_client.filter('app.models:MyModel', kwargs=dict(filters={'c__exact':'c'}, filters_Q=(Q(a='1') | Q(b='1')))

Exclude supported

span_client.filter('app.models:MyModel', kwargs=dict(exclude={'c__exact':'c'}, exclude_Q=(Q(a='1') | Q(b='1')))

You can mix filters and exclude, Q-object with lookups. Try it yourself. ;)

Full list of available kwargs:

filters - dict of terms compatible with django lookup fields
offset - offset from which return a results
limit - max number of results
fields - list of serializer fields, which will be returned
exclude - lookups for excluding matched models
order_by - order of results (list, tuple or string),
    minus ('-') set reverse order, default = []
filters_Q - django Q-object for filtering models
exclude_Q - django Q-object for excluding matched models

List of all MyModel objects with high priority

span_client.filter('app.models:MyModel', high_priority=True)

Creating

Create one object

span_client.create('apps.models:MyModel', data={"a": "a"})

Bulk creating

span_client.create('apps.models:MyModel', data=[{"a": "a"}, {"a": "b"}])

Updating

Update one object by PK field name

span_client.update('apps.models:MyModel', data={"id": 1, "a": "a"})

Update one object by special alias 'pk' which matched automatically to PK field

span_client.update('apps.models:MyModel', data={"id": 1, "a": "a"})

Attention! Magic area! Update one object by any field you wish

span_client.update('apps.models:MyModel', data={"alternative_key_field": 42, "a": "a"}, 
                   {'identity': 'alternative_key_field'})

Update or create, Delete and so on

All cases are very similar. Try it you console!

Full list of supported model methods

All method support options:

Pipe

It's possible to pipeline tasks, so they will be executed in one transaction.

p = span_client.pipe()
p = p.create('apps.models:MyModel', data={"a": "a"})
p = p.create('apps.models:MyAnotherModel', data={"b": "b"})
p.run()

You can pass some arguments from previous task to the next.

Suppose you have those models on the server

class MyModel(models.Model):
    a = models.CharField()

class MyAnotherModel(models.Model):
    fk = models.ForeignKey(MyModel)
    b = models.CharField()

You need to create instance of MyModel and instance of MyAnotherModel which reffers to MyModel

p = span_client.pipe()
p = p.create('apps.models:MyModel', data={"a": "a"})
p = p.translate({"fk": "id"}, defaults={"b": "b"})
p = p.create('apps.models:MyAnotherModel')
p.run()

In this example the translate task:

After that next create task takes result of translate as input data

Add/delete m2m relations

Lets take such models:

class MyModel(models.Model):
    str = models.CharField()

class MyManyToManyModel(models.Model):
    m2m = models.ManyToManyField(MyModel, null=True)

Add relation between existing objects

my_models = span_client.create('apps.models:MyModel', 
                               [{'str': 'monthy'}, {'str': 'python'}])
m2m_model = span_client.create('apps.models:MyManyToManyModel',
                               {'m2m': [my_models[0]['id']]})

# Will add 'python' to m2m_model.m2m where 'monty' already is
data = {'mymodel': my_models[1]['id'], 'mymanytomanymodel': m2m_model['id']}
through = span_client.create('apps.models:MyManyToManyModel.m2m.through', data)

And then delete some of existing relations

# Next `pipe` will eliminate all relations where `mymodel__str` equals 'monty'
p = span_client.pipe()
p = p.filter('apps.models:MyManyToManyModel.m2m.through', {'mymodel__str': 'monthy'})
p = p.delete('apps.models:MyManyToManyModel.m2m.through')
p.run()

Run server instance

celery worker -A celery_rpc.app

Server with support task consuming prioritization

celery multi start 2 -A celery_rpc.app -Q:1 celery_rpc.requests.high_priority

Note, you must replace 'celery_rpc.request' with actual value of config param CELERY_DEFAULT_QUEUE

Command will start two instances. First instance will consume from high priority queue only. Second instance will serve both queues.

For daemonization see Running the worker as a daemon

Run tests

python django-celery-rpc/celery_rpc/runtests/runtests.py

More Configuration

Overriding base task class

OVERRIDE_BASE_TASKS = {
    'ModelTask': 'package.module.MyModelTask',
    'ModelChangeTask': 'package.module.MyModelChangeTask',
    'FunctionTask': 'package.module.MyFunctionTask'
}

Supported class names: ModelTask, ModelChangeTask, FunctionTask

Handling remote exceptions individually

# Both server and client
CELERY_RPC_CONFIG['wrap_remote_errors'] = True

After enabling remote exception wrapping client will raise same errors happened on the server side. If client side has no error defined (i.e. no package installed), Client.RemoteError will be raised. Also, Client.RemoteError is a base for all exceptions on the client side.

For unknown exceptions this code is valid:

try:
    result = rpc_client.call("remote_func")
except rpc_client.errors.SomeUnknownError as e:
    # here a stub for remote SomeUnknownError is handled
    print (e.args)

For known exceptions both variants work:


try:
    result = rpc_client.call("remote_func")
except rpc_client.errors.MultipleObjectsReturned as e:
    # django.core.exceptions.MultipleObjectsReturned
    handle_error(e)
except django.core.exceptions.ObjectDoesNotExist as e:
    # django.core.exceptions.ObjectDoesNotExist 
    handle_error(e)

If original exception hierarchy is needed:


SomeBaseError = rpc_client.errors.SomeBaseError

DerivedError = rpc_client.errors.subclass(SomeBaseError, "DerivedError")

TODO

Acknowledgements

Thanks for all who contributing to this project: