ZeitOnline / celery_longterm_scheduler

Schedules celery tasks to run in the potentially far future
BSD 3-Clause "New" or "Revised" License
27 stars 8 forks source link

Tasks not being scheduled properly #12

Closed emilio-cea closed 1 year ago

emilio-cea commented 1 year ago

In my main file, I have defined a celery app like so:

app = Celery(
    "my_app",
    broker=settings.REDIS_BROKER_TASKS,
    task_cls=celery_longterm_scheduler.Task,
)

app.conf.update(
    task_acks_late=True,  # Don't ACK tasks until finished
    task_serializer="json",
    accept_content=["json"],  # Ignore other content
    result_serializer="json",
    enable_utc=True,
    worker_prefetch_multiplier=1,  # Number of tasks retrieved in a batch
    task_default_priority=settings.MAX_QUEUE_PRIORITY,
    task_queue_max_priority=settings.MAX_QUEUE_PRIORITY,
    longterm_scheduler_backend=settings.REDIS_LONGTERM_SCHEDULER,
)

and a task decorator like this:

@app.task(
    bind=True,
    serializer="json",
    task_acks_late=True,
    autoretry_for=(NoCreds,),
)
def my_method(self, whatever):
...

Then I assume I can just use send_task with the eta flag containing a datetime object:

eta=datetime.now(tz=timezone.utc) + timedelta(weeks=1)
app.send_task(
        "my_file.my_method",
        args=(some args),
        eta=eta,
        priority=priority,
        exchange=exchange_name,
        routing_key=rkey,
    )

and I would execute a celery longerm_scheduler command on a cronjob in order for this task to be acknowledged and executed, but tasks are being acked instantly and they wait for the eta given like a normal task would do.

Also nothing appears on the specified Redis DB setting (longterm_scheduler_backend=settings.REDIS_LONGTERM_SCHEDULER) when sending the task so I don't think this is working the way I thought.

I assumed that when using the send_task(or apply_async) with an eta a job would appear (and the task wouldn't have an eta) on my settings.REDIS_LONGTERM_SCHEDULER DB, and executing celery longerm_scheduler would then ack that task and execute it when needed.

Any pointers as to where to look for faulty config?

Am I sending the tasks properly?

Are the task decorators used properly?

emilio-cea commented 1 year ago

It seems this package only overloads the apply_async method of celery, so I had to switch from send_task to that and it seems to work. Still looking into how to integrate with my project but it's progressing nicely, good work!