agronholm / apscheduler

Task scheduling library for Python
MIT License
6.26k stars 708 forks source link

Next wakeup not reset with scheduler.add_job() #755

Open pmona opened 1 year ago

pmona commented 1 year ago

Things to check first

Version

3.9.1

What happened?

If there is a current job which runs every hour, when run the next wakeup due is set to 3600 seconds from the time of job execution. If another job which is to run every minute is added to the scheduler 1 minute after the original job is executed and the next wakeup set to 3600, the new job will not execute until the next wakeup time which is 59 minutes later. From that point on, the next wakeup will be 300 seconds. I tried to work around this by calling scheduler.wakeup(), but it did not resolve the problem. For now, my work around is to have a dummy job which runs every 30 seconds.

Logs below are from apscheduler and the flask app which front ends the scheduler. You can see there are two jobs: alerts-processing and Altura_7d_rpt. Initially alerts-processing is running every minute and Altura_7d_rpt is running every hour at the 20m mark. Just prior to execution of the Altura_7d_rpt job, I re-added the alerts-processing job to only run in Nov. scheduler.add_job() with replace_existing=True. Then Altura_7d_rpt executed and the next wakeup was set to 3600 seconds. I then updated alerts-processing to again run every minute. Next wakeup was not refreshed, and no jobs ran until 20 after the next hour.

APSCHEDULER Log: 2023-07-12T18:18:00 : DEBUG : _process_jobs : Looking for jobs to run 2023-07-12T18:18:00 : INFO : run_job : Running job "alerts-processing (trigger: cron[year='', month='', day='', week='', day_of_week='', hour='', minute='', second='0'], next run at: 2023-07-12 11:19:00 PDT)" (scheduled at 2023-07-12 11:18:00-07:00) 2023-07-12T18:18:00 : DEBUG : _process_jobs : Next wakeup is due at 2023-07-12 18:19:00+00:00 (in 59.969410 seconds) 2023-07-12T18:18:05 : INFO : run_job : Job "alerts-processing (trigger: cron[year='', month='', day='', week='', day_of_week='', hour='', minute='', second='0'], next run at: 2023-07-12 11:19:00 PDT)" executed successfully 2023-07-12T18:19:00 : DEBUG : _process_jobs : Looking for jobs to run 2023-07-12T18:19:00 : INFO : run_job : Running job "alerts-processing (trigger: cron[year='', month='', day='', week='', day_of_week='', hour='', minute='', second='0'], next run at: 2023-07-12 11:20:00 PDT)" (scheduled at 2023-07-12 11:19:00-07:00) 2023-07-12T18:19:00 : DEBUG : _process_jobs : Next wakeup is due at 2023-07-12 18:20:00+00:00 (in 59.967274 seconds) 2023-07-12T18:19:05 : INFO : run_job : Job "alerts-processing (trigger: cron[year='', month='', day='', week='', day_of_week='', hour='', minute='', second='0'], next run at: 2023-07-12 11:20:00 PDT)" executed successfully 2023-07-12T18:19:39 : INFO : _real_add_job : Added job "alerts-processing" to job store "default" 2023-07-12T18:19:57 : INFO : _real_add_job : Added job "Altura_7d_rpt" to job store "default" 2023-07-12T18:20:00 : DEBUG : _process_jobs : Looking for jobs to run 2023-07-12T18:20:00 : INFO : run_job : Running job "Altura_7d_rpt (trigger: cron[year='', month='', day='', week='', day_of_week='', hour='', minute='20', second='0'], next run at: 2023-07-12 19:20:00 +00)" (scheduled at 2023-07-12 18:20:00+00:00) 2023-07-12T18:20:00 : DEBUG : _process_jobs : Next wakeup is due at 2023-07-12 19:15:00+00:00 (in 3299.974201 seconds) 2023-07-12T18:20:00 : INFO : run_job : Job "Altura_7d_rpt (trigger: cron[year='', month='', day='', week='', day_of_week='', hour='', minute='20', second='0'], next run at: 2023-07-12 19:20:00 +00)" executed successfully 2023-07-12T18:20:05 : INFO : _real_add_job : Added job "Altura_30d_rpt" to job store "default"

FLASK APP Log: 2023-07-12T18:19:39 : DEBUG : add_job : Adding Job: scheduler.add_job(func=exec_job, args=['monitoring-alerting', 'alerts-processing', '/Scripts/alerts_processing/alerts_processing.py', True], id=alerts-processing, max_instances=1, misfire_grace=15, coalesce=True, replace_existing=True, name=alerts-processing, jobstore='default', trigger=cron, second=0, minute=, hour=, day_of_week=, day=, month=11, week=, year=, timezone=America/Los_Angeles) 2023-07-12T18:19:57 : DEBUG : add_job : Adding Job: scheduler.add_job(func=exec_job, args=['reporting', 'Altura_7d_rpt', '/altura7-job-lab.yml', False], id=Altura_7d_rpt, max_instances=1, misfire_grace=15, coalesce=True, replace_existing=True, name=Altura_7d_rpt, jobstore='default', trigger=cron, second=0, minute=20, hour=, day_of_week=, day=, month=, week=, year=, timezone=Atlantic/Azores) 2023-07-12T18:20:00 : DEBUG : exec_job : Running kubectl --kubeconfig /usr/share/nginx/html/k8sapscheduler/.kube/config apply -f /usr/local/bin/scripts/jobs/altura7-job-lab.yml 2023-07-12T18:20:05 : DEBUG : add_job : Adding Job: scheduler.add_job(func=exec_job, args=['reporting', 'Altura_30d_rpt', '/altura30-job-lab.yml', False], id=Altura_30d_rpt, max_instances=1, misfire_grace=15, coalesce=True, replace_existing=True, name=Altura_30d_rpt, jobstore='default', trigger=cron, second=0, minute=22, hour=, day_of_week=, day=, month=, week=, year=, timezone=Atlantic/Azores)

How can we reproduce the bug?

Using background scheduler: Add job 1 running every 5 minutes beginning at the 00 minute mark. Add job 2 running every hour, with first instance beginning at the 03 minute mark

At 00, you should see next wakeup due 60 seconds... At 01 again you should see next wakeup due 60 seconds

At 02, re-add job 1 to run only in another month other than the present month using scheduler.add_job() with replace_existing=True At 03, job 2 should execute and the next wakeup due will be 3600 seconds At 04, re-add job 1 to again run every 5 minutes using scheduler.add_job() with replace_existing=True Next wakeup is not updated and no jobs run until 03 after the next hour.

riosw commented 1 year ago

Is the scheduler running on the same process as where you manually call scheduler.wakeup()?

agronholm commented 1 year ago

Is this reproducible if you use seconds instead of minutes? Would you mind creating a trivial script to demonstrate this?

igorbrigadir commented 1 year ago

I had the same issue. However, i think this isn't an issue with APScheduler, but the way Flask is running:

Is the scheduler running on the same process as where you manually call scheduler.wakeup()?

Here is a minimal script to reproduce using just APScheduler:

from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
import logging
logging.basicConfig(level=logging.DEBUG)

scheduler = BackgroundScheduler(daemon=False)
scheduler.start()

def hour_tick():
    print("Hourly, the time is", datetime.now())

scheduler.add_job(id="hour_tick", func=hour_tick, trigger="interval", hours=1)

def new_job():
    print("New Job, the time is", datetime.now())

scheduler.add_job(id="new_job", func=new_job, trigger="date", run_date=datetime.now() + timedelta(seconds=30))

print("Started.")

This works correctly.

Here is an example using Flask-APScheduler:

from flask import Flask
from datetime import datetime, timedelta

from flask_apscheduler import APScheduler
from apscheduler.schedulers.background import BackgroundScheduler

import logging
logging.basicConfig(level=logging.DEBUG)

app = Flask(__name__)

scheduler = APScheduler(scheduler=BackgroundScheduler(daemon=False))
scheduler.api_enabled = True
scheduler.init_app(app)

@scheduler.task("cron", id="hour_tick", hour="*")
def hour_tick():
    print("Hourly, the time is", datetime.now())

@app.route("/add_job/<int:param>", methods=["GET"])
def add_job(param):
    scheduler.add_job(id="new_job", func=new_job, args=[param], trigger="date", run_date=datetime.now() + timedelta(seconds=10))
    return f"Added {param}", 200

def new_job(param):
    print(f"New Job {param}, the time is", datetime.now())

if __name__ == "__main__":
    scheduler.start()
    print("Starting Scheduler with Flask. Gunicorn doesn't execute this.")
    app.run()

This also works if you just run it directly,

python example.py

but not if you run it with something like gunicorn or something that forks and creates separate workers.

gunicorn --workers=2 --bind 0.0.0.0:5000 "example:app"

because that ends up with the scheduler running in a different thread.

pmona commented 1 year ago

@igorbrigadir I believe you are right about it being missed due to running in a different thread. Our environment is Flask, Gunicorn, wsgi and apscheduler. I'm not sure what we can do for this. For now, our work around is a dummy job which is scheduled to run every 30s.

agronholm commented 1 year ago

If you run multiple workers, you end up running multiple schedulers. You might not want to do that with APScheduler 3.x. The FAQ covers this case. This design issue has been resolved in master (v4.0) through shared data store support, but progress on v4.0 in general has unfortunately been sporadic.

igorbrigadir commented 1 year ago

Yeah, i'm also currently "solving" this with a fake job, and sqlite job storage for now - waiting for v4.0.

thehesiod commented 1 year ago

actually I think there may be a bug. We have a service and at the start it does .add_job, it will first try to add the job, then if it already exists it will do an update_job, as you can see in the update_job code, it's not preserving the existing next_run_time, which means that if you deploy the service before each interval finishes it will never run. I'd imagine it should be the least of the two, in case you change it to a closer offset. To do this properly though you'd probably have to check if any of the interval params have changed or not to determine if you should accept the new interval or not.