AngelLiang / celery-sqlalchemy-scheduler

A Scheduler Based SQLalchemy for Celery.
MIT License
125 stars 62 forks source link

Updating the celery_periodic_task_changed triggers ALL the existing tasks now. #28

Open akapit opened 3 years ago

akapit commented 3 years ago

Whenever I need to update the scheduler tasks, I update the celery_periodic_task_changed to now to make beat aware of the changes.

The problem is that doing that triggers all the scheduled tasks to run NOW, and I just need them to run at the right time they were scheduled.

There is any workaround for this?

Thanks

minsis commented 3 years ago

This is a known issue https://github.com/celery/celery/issues/4806. The gist of it is that when a change is made to any one of your scheduled tasks, beat then reloads the table to make sure everything is current. In doing so it also ends up checking the last_ran_at timestamp and if too much time has elapsed it schedules it for execution. There's some logic issues with how that works. There's a larger work around in the same issue that you can use if you want to fork your own copy. Note that this can also happen if you restart beat in the same timeframe.

The way I've been able to work around it, on start up or changes made to any one of my scheduled entries, I basically just set last_run_at to null. As the logic for last_run_at wont trigger an execution if its the 'first time'. Of course this creates an issue if you actually care about the last execution time. Here my use case it doesn't matter to have this viewable by anyone as I have flower to track all my tasks executions as well as a custom audit table.

This is what my start up of beat looks like (replace SessionManager() with whatever your db connection is):

@beat_init.connect
def reset_last_run(**_):
    """
    Reset all the last ran times in celery database. This is to prevent accidental runs if a new service is deployed
    or restarted.
    """

    logging.info("Resetting last run at for Periodic Task table")
    with SessionManager() as session:
        periodic_tasks = session.query(PeriodicTask).all()
        for pt in periodic_tasks:
            pt.last_run_at = None
            session.add(pt)
        session.commit()