Qingquan-Li / blog

My Blog
https://Qingquan-Li.github.io/blog/
132 stars 16 forks source link

Python 定时任务框架 APScheduler #149

Open Qingquan-Li opened 4 years ago

Qingquan-Li commented 4 years ago

环境:

参考:


一、简单的定时任务

使用 while 死循环

while True:
  if (满足定时条件):
    do something
    break  # 中断,执行一次即可,否则会无限执行
  else:
    time.sleep(60)  # 等待60秒(每60秒轮询一次)
# 测试轮询时间间隔对 CPU 占用率的影响

import time
import random

while True:
  random_int = random.randint(0,1000000)
  if (random_int == 1):
    print("stop when random_int is 1")
  else:
    time.sleep(0.002)


二、Python 定时任务框架选型

附:可以与 Django 框架配合使用的定时任务框架(不在本文介绍):

注:GitHub Star 数量统计日期为 2020-03-13


三、安装 APScheduler

这里安装的是最新版(截止 2020-03-14 ):apscheduler-3.6.3

版本对比:https://apscheduler.readthedocs.io/en/stable/migration.html

The 3.0 series is API incompatible(不兼容) with previous releases due to a design overhaul(大修).

$ pip install apscheduler
# 或者:
$ conda install apscheduler  # 如果安装了 Miniconda 或 Anaconda


四、APScheduler 基本概念

The development of APScheduler was heavily influenced by the Quartz task scheduler written in Java. APScheduler provides most of the major features that Quartz does, but it also provides features not present in Quartz (such as multiple job stores).

Advanced Python Scheduler (APScheduler) is a Python library that lets you schedule your Python code to be executed(执行) later, either just once or periodically(定期).

APScheduler 的 4 个组件:

  1. 触发器(triggers):包含调度逻辑,描述一个任务何时被触发,按日期、或者时间间隔、或者 Cron 表达式三种方式触发。

  2. 作业存储器(job stores):指定作业被存放的位置,默认情况下作业保存在内存,也可将作业保存在各种数据库中。当作业被存放在数据库中时,它会被序列化,当被重新加载时会反序列化。除了下面 2 种常用的 store ,还提供 mongodbredis 等 store 方式。

  3. 执行器(executors):将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。 the default ThreadPoolExecutor should be good enough for most purposes(目的). If your workload involves(涉及) CPU intensive(密集的) operations, you should consider using ProcessPoolExecutor instead to make use of multiple CPU cores. You could even use both at once, adding the process pool executor as a secondary(次要的,辅助) executor. Python 线程/进程可参考:Python 多线程与 GIL

  4. 调度器(schedulers):任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的。

    常用的调度器:

    调度器的工作流程:

    APScheduler


五、APScheduler 简单实例

不对 job stores 和 executors 进行配置,即使用默认配置:MemoryJobStore (内存)和 ThreadPoolExecutor (线程)。


apscheduler_interval_BlockingScheduler.py

# 官方实例参考: https://github.com/agronholm/apscheduler/blob/master/examples/schedulers/blocking.py

from datetime import datetime

from apscheduler.schedulers.blocking import BlockingScheduler

def job_function():
    print(datetime.utcnow().strftime("%H:%M:%S") + " Hello World")

sched = BlockingScheduler()

# Schedule job_function to be called every two seconds
sched.add_job(job_function, 'interval', seconds=2)

# The scheduled_job() decorator works nicely too:
@sched.scheduled_job('interval', id='my_job_id', seconds=2)
# The id argument must be given if scheduling a job in a persistent(持久的) job
def job_function_02():
    print("Goodbye World")

# sched.start()
try:
    sched.start()
except (KeyboardInterrupt, SystemExit):
    # pass
    sched.shutdown()


apscheduler_date_BlockingScheduler.py

from datetime import date

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

def my_job(text):
    print(text)

# The job will be executed on November 6th, 2009
# sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])

# You can specify(指定) the exact(精确) time when the job should be run:
sched.add_job(my_job, 'date', run_date='2020-03-14 23:17:58', args=['Hello World'])

sched.start()


apscheduler_interval_BackgroundScheduler.py

# 本实例源码地址:https://github.com/agronholm/apscheduler/blob/master/examples/schedulers/background.py

"""
Demonstrates(展示) how to use the background scheduler to
schedule(调度) a job that executes on 3 second intervals.
"""

from datetime import datetime
import time
import os

from apscheduler.schedulers.background import BackgroundScheduler

def tick():
    print('Tick! The time is: %s' % datetime.now())

if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    scheduler.add_job(tick, 'interval', seconds=3)
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        # This is here to simulate(模拟) application activity (which keeps the main thread alive).
        while True:
            time.sleep(2)
    except (KeyboardInterrupt, SystemExit):
        # Not strictly necessary if daemonic mode is enabled but should be done if possible
        scheduler.shutdown()


六、APScheduler with MySQL 实例

参考:

1. 实例 - 配置:

apscheduler_config.py

# import pymysql  # Python 驱动 MySQL 的驱动程序

# Django 时区支持模式使用 pytz ,在安装 Django 的时候就已经安装好它了。
# 时区支持模式默认是关闭的,如果要启用它,在配置文件里设置 USE_TZ = True 。
from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

jobstores = {
    # 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
    'default': SQLAlchemyJobStore(
        # 将在数据库中创建一个名为 apscheduler_jobs 的数据表,含有字段:id, next_run_time, job_state
        url='mysql+pymysql://root:Pwd...123456@localhost:3306/dbname?charset=utf8'
    )
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(
    jobstores=jobstores,
    executors=executors,
    job_defaults=job_defaults,
    # timezone=utc,
    timezone='Asia/Shanghai',
)

关于 job_defaults 配置,参数说明:

参考(注意这个链接的 APScheduler 版本为 2 ):https://apscheduler.readthedocs.io/en/v2.1.2/#job-options 参考:https://apscheduler.readthedocs.io/en/stable/modules/schedulers/base.html


2. 实例 - 使用配置

apscheduler_use_config_with_interval.py

将在数据库中创建一个名为 apscheduler_jobs 的数据表,生成一行记录: 含有字段:id, next_run_time, job_state 每运行一次,都会自动增加一条记录。

from datetime import datetime
import time
import os

# 导入上面设置的配置
from apscheduler_config import scheduler

def tick():
    print('Tick! The time is: %s' % datetime.now())

if __name__ == '__main__':
    # scheduler = BackgroundScheduler()
    scheduler.add_job(tick, 'interval', seconds=3)
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        # This is here to simulate(模拟) application activity (which keeps the main thread alive).
        while True:
            time.sleep(2)
    except (KeyboardInterrupt, SystemExit):
        # Not strictly necessary if daemonic mode is enabled but should be done if possible
        scheduler.shutdown()


apscheduler_use_config_with_date.py

将在数据库中创建一个名为 apscheduler_jobs 的数据表,生成一行记录: 含有字段:id, next_run_time(是 add_job 中指定的 run_date ), job_state 当到了指定时间执行完 job 之后,数据表中的当前记录将会被自动删除。

from datetime import date
from datetime import datetime
import time
import os

from apscheduler_config import scheduler

# def my_job(text):
#     print(text)

# # scheduler = BlockingScheduler()
# # 如果不设置 `while True: time.sleep(2)` , apscheduler_config 中需要使用 BlockingScheduler 替代 BackgroundScheduler
# scheduler.add_job(my_job, 'date', run_date='2020-03-16 01:17:30', args=['Hello World'])

# scheduler.start()

def tick():
    print('Tick! The time is: %s' % datetime.now())

if __name__ == '__main__':
    # scheduler = BackgroundScheduler()
    scheduler.add_job(tick, 'date', run_date='2020-03-16 01:32:10')
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        # This is here to simulate(模拟) application activity (which keeps the main thread alive).
        while True:
            time.sleep(2)
    except (KeyboardInterrupt, SystemExit):
        # Not strictly necessary if daemonic mode is enabled but should be done if possible
        scheduler.shutdown()



七、附:使用 uWSGI 部署 Django 项目时,使用 APScheduler ,uWSGI 配置需开启使用线程

参考:

如果使用 uWSGI 部署 Django 项目时,使用了线程运行 APScheduler 定时调度任务,例如如下配置:

......
executors = {
    'default': ThreadPoolExecutor(20),
}
scheduler = BackgroundScheduler(
    ......
    executors=executors,
    ......
)

需要在对应 uWSGI 的 .ini 配置文件中,加入启用(多)线程语句:

enable-threads = true

否则访问IP/网址时报错(Django 项目的 settings.py 中 DEBUG = True 才会显示):

The scheduler seems to be running under uWSGI, but threads have been disabled.
You must run uWSGI with the --enable-threads option for the scheduler to work.