django / channels

Developer-friendly asynchrony for Django
https://channels.readthedocs.io
BSD 3-Clause "New" or "Revised" License
6.08k stars 800 forks source link

Asynchronous database transactions #1937

Open HMaker opened 1 year ago

HMaker commented 1 year ago

Channels provides a decorator DatabaseSyncToAsync to defer database queries to a background threadpool. That decorator can be used to run multiple, independent queries. Transactions can also be done given all queries are made from the same decorated function.

I use Django ORM outside of HTTP context within channels and other asyncio applications. I made DatabaseSyncToAsync not run in thread sensitive mode to allow queries to run in parallel. To make the async API easier to use I partially replicated the sync API of django ORM, it works good for independent queries.

I needed to support asynchronous transactions. The easier solution was deferring the full transaction.atomic() block to a DatabaseSyncToAsync-decorated function, then make all queries from there. But this breaks the async API, I would like to enter the atomic block from the async code and use the same async API to make transaction queries.

I found the thread sensitive mode together with ThreadSensitiveContext could be used to resolve all queries in the same thread that started the transaction, but it required a customization to both DatabaseSyncToAsync and the base class SyncToAsync.

import asyncio
import functools
import typing as t
import contextvars
from concurrent.futures import ThreadPoolExecutor
from asgiref import sync as _sync
from django import db as _db
from django.db import transaction

class SyncToAsync(_sync.SyncToAsync):
    """asgiref.sync.SyncToAsync with conditional thread sensitive mode.

    If the decorated function is called within an active ThreadSensitiveContext it will be
    ran on thread sensitive mode regardless of the initial configuration. If there is no such
    context the thread_sensitive option is checked.
    """

    async def __call__(self, *args, **kwargs):
        loop = asyncio.get_running_loop()
        # PATCH: use thread sensitive mode if we are within a thread sensitive context
        # Work out what thread to run the code in
        if self._thread_sensitive or self.thread_sensitive_context.get(None) is not None:
            if hasattr(_sync.AsyncToSync.executors, "current"):
                # If we have a parent sync thread above somewhere, use that
                executor = _sync.AsyncToSync.executors.current
            elif self.thread_sensitive_context and self.thread_sensitive_context.get(
                None
            ):
                # If we have a way of retrieving the current context, attempt
                # to use a per-context thread pool executor
                thread_sensitive_context = self.thread_sensitive_context.get()
                if thread_sensitive_context in self.context_to_thread_executor:
                    # Re-use thread executor in current context
                    executor = self.context_to_thread_executor[thread_sensitive_context]
                else:
                    # Create new thread executor in current context
                    executor = ThreadPoolExecutor(max_workers=1)
                    self.context_to_thread_executor[thread_sensitive_context] = executor
            elif self.deadlock_context and self.deadlock_context.get(False):
                raise RuntimeError(
                    "Single thread executor already being used, would deadlock"
                )
            else:
                # Otherwise, we run it in a fixed single thread
                executor = self.single_thread_executor
                if self.deadlock_context:
                    self.deadlock_context.set(True)
        else:
            # Use the passed in executor, or the loop's default if it is None
            executor = self._executor
        context = contextvars.copy_context()
        child = functools.partial(self.func, *args, **kwargs)
        func = context.run
        args = (child,)
        kwargs = {}
        try:
            # Run the code in the right thread
            future = loop.run_in_executor(
                executor,
                functools.partial(
                    self.thread_handler,
                    loop,
                    self.get_current_task(),
                    sys.exc_info(),
                    func,
                    *args,
                    **kwargs,
                ),
            )
            ret = await asyncio.wait_for(future, timeout=None)
        finally:
            _sync._restore_context(context)
            if self.deadlock_context:
                self.deadlock_context.set(False)
        return ret

def sync_to_async(
    func=None,
    thread_sensitive=True,
    executor=None,
):
    if func is None:
        return lambda f: SyncToAsync(
            f,
            thread_sensitive=thread_sensitive,
            executor=executor,
        )
    return SyncToAsync(
        func,
        thread_sensitive=thread_sensitive,
        executor=executor,
    )

class AsyncAtomicTransaction:
    """Django atomic database transaction with support for DatabaseSyncToAsync-decorated queries.
    This context manager is re-entrant, inner AsyncAtomicTransaction blocks will be resolved on the
    same background thread, the one which owns the database connection used by the outermost
    transaction.

    Usage:

    async with AsyncAtomicTransaction():
        await DatabaseSyncToAsync(...)(...)
    """

    # transaction_context tracks the outermost AsyncAtomicTransaction block, this block is
    # reponsible for closing the database connection on exit. Actually transactions are always
    # ran in a new thread, from a new executor, so we can't cache connections.
    transaction_context = contextvars.ContextVar('transaction_context')

    def __init__(self):
        self._transaction: transaction.Atomic = None
        self._context: _sync.ThreadSensitiveContext = None
        self._outermost_transaction_token = None

    async def __aenter__(self):
        if self._context is not None:
            raise RuntimeError('already entered')
        self._context = _sync.ThreadSensitiveContext()
        if AsyncAtomicTransaction.transaction_context.get(None) is None:
            self._outermost_transaction_token = AsyncAtomicTransaction.transaction_context.set(self)
        try:
            await self._context.__aenter__()
            await self._enter_transaction()
        except:
            if self._outermost_transaction_token is not None:
                AsyncAtomicTransaction.transaction_context.reset(self._outermost_transaction_token)
            raise

    async def __aexit__(self, exc, value, tb):
        try:
            await self._exit_transaction(exc, value, tb)
        finally:
            await self._context.__aexit__(exc, value, tb)
            if self._outermost_transaction_token is not None:
                AsyncAtomicTransaction.transaction_context.reset(self._outermost_transaction_token)

    @sync_to_async
    def _enter_transaction(self):
        if self._transaction is not None:
            raise RuntimeError('transaction already entered')
        if self._outermost_transaction_token is not None:
            _db.close_old_connections()
        self._transaction = transaction.atomic()
        self._transaction.__enter__()

    @sync_to_async
    def _exit_transaction(self, exc, value, tb):
        try:
            self._transaction.__exit__(exc, value, tb)
        finally:
            if self._outermost_transaction_token is not None:
                _db.connections.close_all()

atomic_transaction = AsyncAtomicTransaction

class DatabaseSyncToAsync(SyncToAsync):
    """
    SyncToAsync version that cleans up old database connections when it exits.
    """

    def thread_handler(self, loop, *args, **kwargs):
        if not _db.connection.in_atomic_block:
            _db.close_old_connections()
        try:
            return super().thread_handler(loop, *args, **kwargs)
        finally:
            if not _db.connection.in_atomic_block:
                _db.close_old_connections()
HMaker commented 1 year ago

@andrewgodwin any thoughts on this? You authored the DEP about async support for Django. Is my adhoc solution for async transactions safe?

carltongibson commented 1 year ago

Related to https://code.djangoproject.com/ticket/33882

HMaker commented 1 year ago

By the way, this custom SyncToAsync with conditional thread sensitive mode is compatible with async HTTP views by wrapping the calls with ThreadSensitiveContext:

from asgiref.sync import ThreadSensitiveContext

# DatabaseSyncToAsync is defined as above
# MyModel is any subclass of models.Model

async def my_view(request):
    async with ThreadSensitiveContext():
        await DatabaseSyncToAsync(MyModel.save)() # this is safe on a HTTP request context

But I am not sure if an explicit context manager is really needed here, I guess the ASGI handler always create one when dispatching the request to middlewares and the view. Am I right?

EDIT: yes it's right, see https://github.com/django/django/blob/1250483ebf73f7a82ff820b94092c63ce4238264/django/core/handlers/asgi.py#L148-L161

devkral commented 1 year ago

thanks a lot for overtaking the issue.

I worked around the issue by using status variables (also I couldn't wrap every access into a transaction), so I cannot provide you with more information.

HMaker commented 1 year ago

@devkral what is a status variable?

devkral commented 1 year ago

I split the transaction in locking phase, transfer (reads the foreign url), unlocking phase. THe locking is done via a status variable

HMaker commented 1 year ago

I split the transaction in locking phase, transfer (reads the foreign url), unlocking phase. THe locking is done via a status variable

You wrapped the transaction.atomic() context manager with sync_to_async() ? By default sync_to_async() runs all code in a single thread (thread_sensitive=True). Django devs assume you are always on a HTTP request context where you have a dedicated thread for every request, here it makes sense running all sync code on an unique thread, there is still concurrency between requests. This also brings thread-safety as the default.

When using Django ORM outside of HTTP request context, like in channels background workers, sync_to_async() default behavior is a huge bottleneck, all DB queries are serial, there is no concurrency at all. My code removes this bottleneck but still maintains compatibility with Django code running on HTTP request context (i.e. thread-safety is honored).

devkral commented 1 year ago

No, it is not a real transaction. It is the standard way to mark an object as in use (or locked). And change the status back after the update is complete. This is a little bit off-topic.