ZeitOnline / celery_longterm_scheduler

Schedules celery tasks to run in the potentially far future
BSD 3-Clause "New" or "Revised" License
27 stars 8 forks source link

Handle long countdowns as well as ETAs #4

Open wryun opened 3 years ago

wryun commented 3 years ago

I would find it useful if Task was overridden to handle long countdowns (configurable) as well as ETAs.

Would you be open to a PR?

I believe this is particularly important when you're using redis as a celery backend, since long countdowns can push you past the visibility timeout and cause duplicates: https://docs.celeryproject.org/en/stable/getting-started/brokers/redis.html#id1

wryun commented 3 years ago

Amusingly, this is the same thing that Celery does deep in its internals (i.e. countdowns are equivalent to ETAs):

https://github.com/celery/celery/blob/8c5e9888ae10288ae1b2113bdce6a4a41c47354b/celery/app/amqp.py#L294

jeffreybrowning commented 3 years ago

Since countdown is syntactic sugar for eta as @wryun has pointed out, this is definitely a gotcha.

wryun commented 3 years ago

We ended up creating our own somewhat scary implementation that backed onto a django db table:

"""
Back longterm celery tasks onto db.
See PendingTaskCelery for how these get created.
"""

from importlib import import_module
import logging

from django.db import transaction
from django.utils.timezone import now as get_now

from kombu.utils.uuid import uuid

from grok import celery_app
from grok.models import PendingTask

__all__ = ['execute_pending_tasks', 'schedule_pending_task']

logger = logging.getLogger('grok.main')

def schedule_pending_task(eta, args, kwargs):
  # If it's a pending task, we assume etas/countdowns have already been dealt with.
  assert kwargs.get('eta') is None, 'Attempting to schedule pending task with embedded eta'
  assert kwargs.get('countdown') is None, 'Attempting to schedule pending task with embedded countdown'

  # Use the celery task_id for scheduler storage.
  # This may make it slightly nicer to follow a task through the system.
  if not kwargs.get('task_id'):
    kwargs['task_id'] = uuid()

  # We can't JSON encode task type (it's the task function itself).
  task_type = kwargs.get('task_type')
  if task_type:
    kwargs['task_type'] = f'{task_type.__module__}:{task_type.__name__}'

  pt = PendingTask.objects.create(id=kwargs['task_id'], eta=eta, args=args, kwargs=kwargs)
  logger.info('Scheduled task %s for %s', pt.id, pt.eta)

  return kwargs['task_id']

def execute_pending_tasks(current_time):
  while True:
    with transaction.atomic():
      # skip_locked means it's safe to run multiple of these at the same time without blocking,
      # but we don't intend to do this...
      pt = PendingTask.objects.select_for_update(skip_locked=True).filter(eta__lte=current_time).order_by('eta').first()
      if not pt:
        break

      if pt.kwargs.get('task_type'):
        mod, name = pt.kwargs['task_type'].split(':')
        pt.kwargs['task_type'] = getattr(import_module(mod), name)

      celery_app.send_task(*pt.args, **pt.kwargs)

      logger.info('Enqueued scheduled task %s (expected at %s, enqueued at %s)', pt.id, pt.eta, get_now())
      pt.delete()
class PendingTaskCelery(Celery):
  """
  Intercept send_task on long countdowns/ETAs and send them to our PendingTasks.
  Inspired by https://github.com/ZeitOnline/celery_longterm_scheduler
  This avoids the issue with countdowns exceeding the visibility timeout:
  https://docs.celeryproject.org/en/stable/getting-started/brokers/redis.html#redis-caveats
  """
  def send_task(self, *args, **kwargs):
    # Convert any long countdowns to ETAs.
    countdown = kwargs.get('countdown', 0)
    if countdown > LONG_COUNTDOWN_THRESHOLD:
      # Amusingly, the underlying code in Celery does the same thing to countdowns:
      # https://github.com/celery/celery/blob/8c5e9888ae10288ae1b2113bdce6a4a41c47354b/celery/app/amqp.py#L294
      del kwargs['countdown']
      kwargs['eta'] = get_now() + timedelta(seconds=countdown)

    if kwargs.get('eta') is None:  # Can't just check for eta, because sometimes it's set to None (e.g. on retries)
      return super().send_task(*args, **kwargs)
    else:
      from grok.core.pending_tasks import schedule_pending_task
      result_cls = kwargs.pop('result_cls', self.AsyncResult)

      if 'task_id' not in kwargs:
        kwargs['task_id'] = uuid()

      schedule_pending_task(kwargs.pop('eta'), args, kwargs)
      return result_cls(kwargs['task_id'])

Same BSD licence as applied to this repository if you want to use it, since it was inspired by.

wryun commented 3 years ago

(there's a celery beat scheduled task which calls execute_pending_tasks)

SuperMasterBlasterLaser commented 2 years ago

@wryun I'm struggling to do same thing. I have constantly getting hundreds of tasks each hour, and some of them have eta for several days.

You are launching this celery beat periodic task each minute?

wryun commented 2 years ago

@SuperMasterBlasterLaser yes, we run execute_pending_tasks every minute. The code above appears to have been working well for the past year and a half, but please do your own checking ;)

If you want better granularity on the ETAs than we need, you could change this so that execute_pending_tasks puts things on the queue with an ETA if they're less than the beat interval (i.e. if the beat interval is 1 minute, anything due in the next minute should be queued, as opposed to the current approach which is to wait until they're past due).