Closed yuxiaoy1 closed 3 years ago
clock.py
clock: python clock.py --logLEVEL = INFO
from app import create_app, update_db from flask import current_app from app.emails import send_email from apscheduler.schedulers.blocking import BlockingScheduler sched = BlockingScheduler() app = None # 生成app对象,并全局化可用 def get_app(): global app app = app if app is not None else create_app('production') # 定义定时任务,更新数据库 @sched.scheduled_job('cron', max_instances=10, hour='7-23', minute='1,31') def scheduled_job(): get_app() with app.app_context(): try: update_db() except Exception as e: send_email(subject='定时任务报错', to=current_app.config['ADMIN_EMAIL'], template='emails/task_error_email', task_error=e) if __name__ == '__main__': sched.start()
应用部署在heroku,根目录创建了
clock.py
,并通过clock: python clock.py --logLEVEL = INFO
能正常启动定时任务:clock.py
代码清单:目前的写法主要参考了Flask-APScheduler定时任务查询操作数据库(多文件/模块),可以正常运行,但是觉得会有更优雅的实现方式,只是不太清楚怎么实现,麻烦解答下 😄