Bogdanp / django_dramatiq

A Django app that integrates with Dramatiq.
https://dramatiq.io
Other
331 stars 77 forks source link

Race condition at AdminMiddleware #154

Open spumer opened 9 months ago

spumer commented 9 months ago

Hi. We have a lot of tasks in ENQUEUED status but they executed and jobs well done.

I think we have small race condition at middleware callbacks:

  1. Scheduler emit before_enqueue
  2. Scheduler publish task to queue
  3. Worker got task from queue
  4. Worker emit before processing (update task status=RUNNING)
  5. Worker done task
  6. Worker emit after processing. (update task status=DONE)
  7. Scheduler emit afer_enqueue (update task status=ENQUEUED)

In logs this looks like this:

image

You can see different order for Create and Update logs. But last log is Update. Do not worry, message logged before SQL operation and this is not equal reflection of SQL updates.


So, i want make a PR to fix that. I see two solutions for that case:

  1. Optimistic locks: check row version at update and ignore if it changed (this require schema update)
  2. Do not allow change status from DONE/RUNNING to ENQUEUED. Check status when update. (control state flow)

@amureki what do you think about that?

spumer commented 9 months ago

Quick fix in our case is use get_or_create instead create_or_update in after_enqueue method

    def after_enqueue(self, broker, message, delay):
        from django_dramatiq.models import Task

        self.logger.debug('Creating Task from message %r.', message.message_id)
        status = Task.STATUS_ENQUEUED
        if delay:
            status = Task.STATUS_DELAYED

        Task.tasks.get_or_create(
            id=message.message_id,
            defaults={
                'message_data': message.encode(),
                'status': status,
                'actor_name': message.actor_name,
                'queue_name': message.queue_name,
            },
        )
dapanin commented 4 months ago

Сan I solve it this way?

    def before_enqueue(self, broker, message, delay):
        super().before_enqueue(broker, message, delay)
        super().after_enqueue(broker, message, delay)

    def after_enqueue(self, broker, message, delay):
        pass
spumer commented 2 months ago

finally we are fixed it by strict status transition flow:


class AdminMiddleware(django_dramatiq.middleware.AdminMiddleware):
    logger = logging.getLogger(__name__).getChild('AdminMiddleware')

    @cached_property
    def transitions(self):
        from django_dramatiq.models import Task

        # target_status -> [source_status, ...]
        transitions = {
            Task.STATUS_ENQUEUED: [  # Задача поступила в очередь на обработку
                None,  # Задачи не было, создали новую
                Task.STATUS_DELAYED,  # Перешло из DELAYED очереди
            ],
            Task.STATUS_RUNNING: [  # Worker взял задачу в работу
                Task.STATUS_ENQUEUED,
                Task.STATUS_DELAYED,
            ],
            Task.STATUS_FAILED: [  # Задача завершилась с ошибкой (н-р Exception)
                Task.STATUS_RUNNING,
            ],
            Task.STATUS_DELAYED: [  # Задача отложена
                # Задачу можно отложить из любого конечного статуса
                Task.STATUS_FAILED,
                Task.STATUS_SKIPPED,
                Task.STATUS_DONE,
            ],
        }
        return transitions

    def after_enqueue(self, broker, message, delay):
        from django_dramatiq.models import Task

        self.logger.debug('Creating Task from message %r.', message.message_id)
        target_status = Task.STATUS_ENQUEUED
        if delay:
            target_status = Task.STATUS_DELAYED

        obj, created = Task.tasks.get_or_create(
            id=message.message_id,
            defaults={
                'message_data': message.encode(),
                'status': target_status,
                'actor_name': message.actor_name,
                'queue_name': message.queue_name,
            },
        )
        if created:
            return

        source_status = obj.status
        if source_status in self.transitions[target_status]:
            self.logger.debug(
                'Update Task status for message %r: %s -> %s',
                message.message_id,
                source_status,
                target_status,
            )
            Task.tasks.filter(id=message.message_id, status=source_status).update(status=target_status)
        else:
            self.logger.debug(
                'Incorrect Task status transition for message %r: %s -> %s',
                message.message_id,
                source_status,
                target_status,
            )
spumer commented 2 months ago

@amureki if you agree with that i will make a PR

amureki commented 2 months ago

@amureki if you agree with that i will make a PR

So far, your proposal seems logical to me. And if you already have it running and it resolves this race condition - then even better.

I am happy to review the patch. 👍