cloudblue / django-cqrs

django-cqrs is an Django application, that implements CQRS data synchronization between several Django micro-services
https://django-cqrs.readthedocs.io/en/latest/
Apache License 2.0
122 stars 24 forks source link
cqrs django django-cqrs highload microservices python rabbitmq

Django CQRS

pyversions PyPI Docs Coverage GitHub Workflow Status PyPI status Quality Gate Status PyPI Downloads GitHub

django-cqrs is an Django application, that implements CQRS data synchronisation between several Django microservices.

CQRS

In Connect we have a rather complex Domain Model. There are many microservices, that are decomposed by subdomain and which follow database-per-service pattern. These microservices have rich and consistent APIs. They are deployed in cloud k8s cluster and scale automatically under load. Many of these services aggregate data from other ones and usually API Composition is totally enough. But, some services are working too slowly with API JOINS, so another pattern needs to be applied.

The pattern, that solves this issue is called CQRS - Command Query Responsibility Segregation. Core idea behind this pattern is that view databases (replicas) are defined for efficient querying and DB joins. Applications keep their replicas up to data by subscribing to Domain events published by the service that owns the data. Data is eventually consistent and that's okay for non-critical business transactions.

Documentation

Full documentation is available at https://django-cqrs.readthedocs.org.

Examples

You can find an example project here

Integration

from django.db import models from dj_cqrs.mixins import MasterMixin, RawMasterMixin

class Account(MasterMixin, models.Model): CQRS_ID = 'account' CQRS_PRODUCE = True # set this to False to prevent sending instances to Transport

class Author(MasterMixin, models.Model): CQRS_ID = 'author' CQRS_SERIALIZER = 'app.api.AuthorSerializer'

For cases of Diamond Multi-inheritance or in case of Proxy Django-models the following approach could be used:

from mptt.models import MPTTModel from dj_cqrs.metas import MasterMeta

class ComplexInheritanceModel(MPTTModel, RawMasterMixin): CQRS_ID = 'diamond'

class BaseModel(RawMasterMixin): CQRS_ID = 'base'

class ProxyModel(BaseModel): class Meta: proxy = True

MasterMeta.register(ComplexInheritanceModel) MasterMeta.register(BaseModel)


```python
# settings.py

CQRS = {
    'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
    'host': RABBITMQ_HOST,
    'port': RABBITMQ_PORT,
    'user': RABBITMQ_USERNAME,
    'password': RABBITMQ_PASSWORD,
}

class AccountRef(ReplicaMixin, models.Model): CQRS_ID = 'account'

id = models.IntegerField(primary_key=True)

class AuthorRef(ReplicaMixin, models.Model): CQRS_ID = 'author' CQRS_CUSTOM_SERIALIZATION = True

@classmethod
def cqrs_create(cls, sync, mapped_data, previous_data=None, meta=None):
    # Override here
    pass

def cqrs_update(self, sync, mapped_data, previous_data=None, meta=None):
    # Override here
    pass

```python
# settings.py

CQRS = {
    'transport': 'dj_cqrs.transport.RabbitMQTransport',
    'queue': 'account_replica',
    'host': RABBITMQ_HOST,
    'port': RABBITMQ_PORT,
    'user': RABBITMQ_USERNAME,
    'password': RABBITMQ_PASSWORD,
}

Notes

Example:


class FilteredSimplestModel(MasterMixin, models.Model):
    CQRS_ID = 'filter'

    name = models.CharField(max_length=200)

    def is_sync_instance(self):
        return len(str(self.name)) > 2

Django Admin

Add action to synchronize master items from Django Admin page.

from django.db import models
from django.contrib import admin

from dj_cqrs.admin_mixins import CQRSAdminMasterSyncMixin

class AccountAdmin(CQRSAdminMasterSyncMixin, admin.ModelAdmin):
    ...

admin.site.register(models.Account, AccountAdmin)

Utilities

Bulk synchronizer without transport (usage example: it may be used for initial configuration). May be used at planned downtime.

Filter synchronizer over transport (usage example: sync some specific records to a given replica). Can be used dynamically.

Set of diff synchronization tools:

Development

  1. Python >= 3.8
  2. Install dependencies requirements/dev.txt
  3. We use isort library to order and format our imports, and black - to format the code. We check it using flake8-isort and flake8-black libraries (automatically on flake8 run).
    For convenience you may run isort . && black . to format the code.

Testing

Unit testing

  1. Python >= 3.8
  2. Install dependencies requirements/test.txt
  3. export PYTHONPATH=/your/path/to/django-cqrs/

Run tests with various RDBMS:

Check code style: flake8 Run tests: pytest

Tests reports are generated in tests/reports.

To generate HTML coverage reports use: --cov-report html:tests/reports/cov_html

Integrational testing

  1. docker compose
  2. cd integration_tests
  3. docker compose run master