collective / collective.celery

celery integration with Plone
Other
9 stars 9 forks source link

Avoid swallowing exceptions while queuing tasks #10

Closed davisagli closed 6 years ago

davisagli commented 8 years ago

In the codebase that collective.celery is (loosely) based on, we had a problem where celery could not connect to the message broker, and we didn't notice because it turns out that transaction swallows exceptions in after-commit hooks.

To fix this we switched to using a transaction synchronizer:

from celery import Task
from transaction.interfaces import ISynchronizer
import transaction

@implementer(ISynchronizer)
class CelerySynchronizer(object):
    """Handles communication with celery at transaction boundaries."""

    def beforeCompletion(self, txn):
        pass

    def afterCompletion(self, txn):
        """Called after commit or abort
        """
        if txn.status == transaction._transaction.Status.COMMITTED:
            tasks = getattr(txn, '_celery_tasks', [])
            for task, args, kw in tasks:
                Task.apply_async(task, *args, **kw)

    def newTransaction(self, txn):
        txn._celery_tasks = []

# It's important that we assign the synchronizer to a variable,
# because the transaction manager stores it using a weak reference.
celery_synch = CelerySynchronizer()

def queue_task_after_commit(task, args, kw):
    # make sure the synchronizer is registered for this thread
    transaction.manager.registerSynch(celery_synch)

    txn = transaction.get()
    if not hasattr(txn, '_celery_tasks'):
        txn._celery_tasks = []
    txn._celery_tasks.append((task, args, kw))

Maybe someone wants to update collective.celery to do the same.

runyaga commented 6 years ago

fixed