procrastinate-org / procrastinate

PostgreSQL-based Task Queue for Python
https://procrastinate.readthedocs.io/
MIT License
867 stars 55 forks source link

Django - support for transaction.atomic #502

Closed michaszcz closed 9 months ago

michaszcz commented 2 years ago

Hey guys, thanks for a great library.

Context

Just to give you a context, I'd like to use it in Django project to reliably communicate with other microservices and implement Transactional outbox and Message Relay patterns.

Problem

The problem is that currently it's not possible to save django models and defer a task in one transaction. Example code below:

import procrastinate
from django.db import transaction
from .models import *
from procrastinate.contrib.django import connector_params
from asgiref.sync import sync_to_async

app = procrastinate.App(
    connector=procrastinate.Psycopg2Connector(**connector_params())
)

@app.task
def some_other_task():
     print("Hello Word")

@app.task
@sync_to_async  # Needed to be able to use Django ORM
@transaction.atomic
def send_order_placed_notification():
    some_other_task.defer()  # Task is deferred even when the next line throws IntegrityError (transaction rollback)
    SomeModel.objects.create()

@transaction.atomic
def some_view(request):
    send_order_placed_notification.defer()  # Task is deferred even when the next line throws IntegrityError (transaction rollback)
    Order.objects.create()

Root cause

Psycopg2Connector/AiopgConnector create separate database connection instead of using django.db.connection

ewjoachim commented 2 years ago

Ah interesting ! Would you like to participate in a PR to fix this ? Otherwise, I can't promise when I'll have time to try a fix, but we'll definitely try to find something.

ewjoachim commented 2 years ago

Maybe the best would be to make a DjangoConnector ? Or split the Psycopg2Connector logic by creating a "ConnectionProvider" class, which would have a default implementation that creates a pool as is done currently, and a DjangoConnectionProvider that would read the connection from django ?

Following https://hynek.me/articles/python-subclassing-redux/, I'd tend to say the second solution might be better.

michaszcz commented 2 years ago

Yes, I would like to participate.

I've already created DjangoConnector based on Psycopg2Connetor, but it is working only in django app. I have to investigate how to make it work for workers (they require async connector at the moment).

ewjoachim commented 2 years ago

Maybe it's ok as a first step if we have a different connector for deferring and for workers, but I believe you may want to use the ORM in the tasks and it will be problematic so...

Interesting problem to solve :) Feel free to expose as much as you want of your findings here and we'll brainstorm our way through it together :)

michaszcz commented 2 years ago

I've created some draft which we can discuss

michaszcz commented 2 years ago

Summary of my findings so far:

  1. To be able to defer task and use Django ORM in one transaction the same connection must be used, so django.db.connections must be used. Both in procrastinate workers and django application .
  2. Django will raise SynchronousOnlyOperation exception when running some parts of code (eg. Django ORM) from a thread where there is a running event loop. More info here. To fix it sync_to_async(thread_sensitive=True) could be used.
  3. Procrastinate worker runs his own event loop in main thread, because of that sync_to_async will run Django ORM code in different thread. Running Django ORM code not from main thread may cause different issues.

    The reason this is needed in Django is that many libraries, specifically database adapters, require that they are accessed in the same thread that they were created in. Also a lot of existing Django code assumes it all runs in the same thread, e.g. middleware adding things to a request for later use in views.

So I think the easiest way to make it work for Django is to rewrite the procrastinate worker in a synchronous way and create DjangoConnector.

ewjoachim commented 2 years ago

I think the easiest way to make it work for Django is to rewrite the procrastinate worker in a synchronous way

Yes, I think so too.

caire-bear commented 1 year ago

to reliably communicate with other microservices and implement Transactional outbox and Message Relay patterns.

Came here for the exact same pattern, but using FastAPI/SQLAlchemy. We can create a session object in our FastAPI handlers and pass that around, but need a way to thread it into the defer_async call.

petrprikryl commented 1 year ago

Hi, if anyone is facing problem starting worker in Django app:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/procrastinate/utils.py", line 391, in run_tasks
    await asyncio.gather(_main(), *side_tasks)
  File "/usr/local/lib/python3.11/site-packages/procrastinate/utils.py", line 383, in _main
    await asyncio.gather(*main_tasks)
  File "/usr/local/lib/python3.11/site-packages/procrastinate/worker.py", line 134, in single_worker
    job = await self.job_manager.fetch_job(self.queues)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/procrastinate/manager.py", line 139, in fetch_job
    if row["id"] is None:
       ~~~^^^^^^
TypeError: tuple indices must be integers or slices, not str

This helped me:

from procrastinate.contrib.django import connector_params

params = connector_params()
del params["cursor_factory"]  # looks like Django is passing tuple based cursor

app = App(connector=AiopgConnector(**params))

Next, use @sync_to_async(thread_sensitive=False) to have concurrent sync jobs. Otherwise will be worker blocked by single task even with --concurrency=30 argument.

@app.task(name="mytask")
async def mytask(obj_pk):
    @sync_to_async(thread_sensitive=False)
    def work():
        time.sleep(5)  # sync placeholder
ewjoachim commented 1 year ago

Next, use @sync_to_async(thread_sensitive=False) to have concurrent sync jobs

Yes, we have something in the works for switching to sync_to_async/async_to_sync

For the rest, I believe the best way to solve this will be when we add support for psycopg3

ewjoachim commented 9 months ago

transaction.atomic now work with the revamp of the Django integration (not: it's not a stable release yet but it's already merged in main)