AngelLiang / celery-sqlalchemy-scheduler

A Scheduler Based SQLalchemy for Celery.
MIT License
125 stars 62 forks source link

Beat was unable to find crontab task in the database #32

Closed gmorkgg closed 3 years ago

gmorkgg commented 3 years ago

crontab能够写入数据库,但是beat看起来没有从数据库中找到定时任务

所有环境都是基于Docker镜像

使用了 FastAPI框架的开源项目 Full-Stack-FastAPI-PostgreSQL
项目地址:
https://github.com/tiangolo/full-stack-fastapi-postgresql

环境

Celery: 4.4.7   基于python3.8 docker构建
Celery-sqlalchemy-scheduler:0.3.0
postgres : 12 

问题:

crontab能够写入数据库,但是beat看起来没有从数据库中找到定时任务:

我的代码配置:

from raven import Client
from app.core.celery_app import celery_app
from app.core.config import settings
from app.core import batch_update
from app.core import rds_batch_update

client_sentry = Client(settings.SENTRY_DSN)

from celery_sqlalchemy_scheduler.models import PeriodicTask, CrontabSchedule
from celery_sqlalchemy_scheduler.session import SessionManager
from celery_sqlalchemy_scheduler.schedulers import DatabaseScheduler 

BEAT_DBURI = "postgresql+psycopg2://postgresql:password@db:5432/devops"

session_manager = SessionManager()
session = session_manager.session_factory(dburi=BEAT_DBURI)

schedule_aws_ec2 = CrontabSchedule(
    minute='50',
    hour='10',
    day_of_week='1',
    day_of_month='1',
    month_of_year='1',
    timezone='Asia/Shanghai'
)
schedule_aws_rds = CrontabSchedule(
    minute='55',
    hour='10',
    day_of_week='1',
    day_of_month='1',
    month_of_year='1',
    timezone='Asia/Shanghai'
)

periodic_task_aws_ec2 = PeriodicTask(
    crontab=schedule_aws_ec2,
    name='batch_update',
    task='app.worker.ec2date_update',
)
periodic_task_aws_rds = PeriodicTask(
    crontab=schedule_aws_rds,
    name='awsrds_batch_update',
    task='app.worker.rdsdate_update',
)
beat_max_loop_interval = 10
worker_max_tasks_per_child = 10
timezone = 'Asia/Shanghai'

config = {
    # 'beat_scheduler': beat_scheduler,  # 命令行传参配置了,所以这里并不需要写死在代码里
    'beat_max_loop_interval': beat_max_loop_interval,
    'timezone': timezone,
    'worker_max_tasks_per_child': worker_max_tasks_per_child
}
celery_app.conf.update(config)

query_aws_ec2_schedule = session.query(PeriodicTask).filter_by(name=periodic_task_aws_ec2.name).first()
query_aws_rds_schedule = session.query(PeriodicTask).filter_by(name=periodic_task_aws_rds.name).first()

if not query_aws_ec2_schedule:
    session.add(periodic_task_aws_ec2)
    session.commit()
    session.close()

if not query_aws_rds_schedule:
    session.add(periodic_task_aws_rds)
    session.commit()
    session.close()

@celery_app.task(acks_late=True)
def test_celery(word: str) -> str:
    return f"test task return {word}"

@celery_app.task(acks_late=True)
def ec2date_update():
    batch_update.all_ec2_update()
    return

@celery_app.task(acks_late=True)
def rdsdate_update():
    rds_batch_update.all_rds_update()
    return

if __name__ == "__main__":
    celery_app.start()

我的beat启动命令:

celery worker -A app.worker -Q main-queue -c 2 -B --scheduler celery_sqlalchemy_scheduler.schedulers:DatabaseScheduler -l debug 

Celery Docker启动日志:Debug日志

Attaching to full-stack-fastapi-postgresql_celeryworker_1
celeryworker_1  | INFO:__main__:Initializing service
celeryworker_1  | INFO:__main__:Starting call to '__main__.init', this is the 1st time calling it.
celeryworker_1  | INFO:__main__:Service finished initializing
celeryworker_1  | /usr/local/lib/python3.8/site-packages/celery/platforms.py:800: RuntimeWarning: You're running the worker with superuser privileges: this is
celeryworker_1  | absolutely not recommended!
celeryworker_1  |
celeryworker_1  | Please specify a different user using the --uid option.
celeryworker_1  |
celeryworker_1  | User information: uid=0 euid=0 gid=0 egid=0
celeryworker_1  |
celeryworker_1  |   warnings.warn(RuntimeWarning(ROOT_DISCOURAGED.format(
celeryworker_1  | [2021-05-10 06:46:38,505: DEBUG/MainProcess] | Worker: Preparing bootsteps.
celeryworker_1  | [2021-05-10 06:46:38,508: DEBUG/MainProcess] | Worker: Building graph...
celeryworker_1  | [2021-05-10 06:46:38,510: DEBUG/MainProcess] | Worker: New boot order: {Beat, Timer, Hub, Pool, Autoscaler, StateDB, Consumer}
celeryworker_1  | [2021-05-10 06:46:38,521: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
celeryworker_1  | [2021-05-10 06:46:38,522: DEBUG/MainProcess] | Consumer: Building graph...
celeryworker_1  | [2021-05-10 06:46:38,576: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Heart, Mingle, Gossip, Agent, Tasks, Control, event loop}
celeryworker_1  | [2021-05-10 06:46:38,585: DEBUG/MainProcess] | Worker: Starting Beat
celeryworker_1  | [2021-05-10 06:46:38,588: DEBUG/MainProcess] ^-- substep ok
celeryworker_1  | [2021-05-10 06:46:38,589: DEBUG/MainProcess] | Worker: Starting Hub
celeryworker_1  | [2021-05-10 06:46:38,590: DEBUG/MainProcess] ^-- substep ok
celeryworker_1  | [2021-05-10 06:46:38,590: DEBUG/MainProcess] | Worker: Starting Pool
celeryworker_1  | [2021-05-10 06:46:38,729: DEBUG/MainProcess] ^-- substep ok
celeryworker_1  | [2021-05-10 06:46:38,730: DEBUG/MainProcess] | Worker: Starting Consumer
celeryworker_1  | [2021-05-10 06:46:38,731: DEBUG/MainProcess] | Consumer: Starting Connection
celeryworker_1  | [2021-05-10 06:46:38,752: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@1aebc17b4370', 'copyright': 'Copyright (c) 2007-2021 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 23.3.1', 'product': 'RabbitMQ', 'version': '3.8.14'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
celeryworker_1  | [2021-05-10 06:46:38,759: INFO/MainProcess] Connected to amqp://guest:**@queue:5672//
celeryworker_1  | [2021-05-10 06:46:38,760: DEBUG/MainProcess] ^-- substep ok
celeryworker_1  | [2021-05-10 06:46:38,760: DEBUG/MainProcess] | Consumer: Starting Events
celeryworker_1  | [2021-05-10 06:46:38,780: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@1aebc17b4370', 'copyright': 'Copyright (c) 2007-2021 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 23.3.1', 'product': 'RabbitMQ', 'version': '3.8.14'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
celeryworker_1  | [2021-05-10 06:46:38,783: DEBUG/MainProcess] ^-- substep ok
celeryworker_1  | [2021-05-10 06:46:38,784: DEBUG/MainProcess] | Consumer: Starting Heart
celeryworker_1  | [2021-05-10 06:46:38,785: DEBUG/MainProcess] using channel_id: 1
celeryworker_1  | [2021-05-10 06:46:38,803: DEBUG/MainProcess] Channel open
celeryworker_1  | [2021-05-10 06:46:38,812: DEBUG/MainProcess] ^-- substep ok
celeryworker_1  | [2021-05-10 06:46:38,812: DEBUG/MainProcess] | Consumer: Starting Mingle
celeryworker_1  | [2021-05-10 06:46:38,813: INFO/MainProcess] mingle: searching for neighbors
celeryworker_1  | [2021-05-10 06:46:38,814: DEBUG/MainProcess] using channel_id: 1
celeryworker_1  | [2021-05-10 06:46:38,830: DEBUG/MainProcess] Channel open
celeryworker_1  | [2021-05-10 06:46:38,890: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@1aebc17b4370', 'copyright': 'Copyright (c) 2007-2021 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 23.3.1', 'product': 'RabbitMQ', 'version': '3.8.14'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
celeryworker_1  | [2021-05-10 06:46:38,899: DEBUG/MainProcess] using channel_id: 1
celeryworker_1  | [2021-05-10 06:46:38,908: DEBUG/MainProcess] Channel open
celeryworker_1  | [2021-05-10 06:46:39,971: INFO/MainProcess] mingle: all alone
celeryworker_1  | [2021-05-10 06:46:39,972: DEBUG/MainProcess] ^-- substep ok
celeryworker_1  | [2021-05-10 06:46:39,972: DEBUG/MainProcess] | Consumer: Starting Gossip
celeryworker_1  | [2021-05-10 06:46:39,973: DEBUG/MainProcess] using channel_id: 2
celeryworker_1  | [2021-05-10 06:46:39,975: DEBUG/MainProcess] Channel open
celeryworker_1  | [2021-05-10 06:46:39,989: DEBUG/MainProcess] ^-- substep ok
celeryworker_1  | [2021-05-10 06:46:39,989: DEBUG/MainProcess] | Consumer: Starting Tasks
celeryworker_1  | [2021-05-10 06:46:40,001: DEBUG/MainProcess] ^-- substep ok
celeryworker_1  | [2021-05-10 06:46:40,002: DEBUG/MainProcess] | Consumer: Starting Control
celeryworker_1  | [2021-05-10 06:46:40,002: DEBUG/MainProcess] using channel_id: 3
celeryworker_1  | [2021-05-10 06:46:40,004: DEBUG/MainProcess] Channel open
celeryworker_1  | [2021-05-10 06:46:40,014: DEBUG/MainProcess] ^-- substep ok
celeryworker_1  | [2021-05-10 06:46:40,014: DEBUG/MainProcess] | Consumer: Starting event loop
celeryworker_1  | [2021-05-10 06:46:40,015: DEBUG/MainProcess] | Worker: Hub.register Pool...
celeryworker_1  | [2021-05-10 06:46:40,018: INFO/MainProcess] celery@7a67a0149293 ready.
celeryworker_1  | [2021-05-10 06:46:40,019: DEBUG/MainProcess] basic.qos: prefetch_count->8
celeryworker_1  | [2021-05-10 06:46:40,899: DEBUG/MainProcess] pidbox received method enable_events() [reply_to:None ticket:None]
celeryworker_1  | [2021-05-10 06:46:40,899: INFO/MainProcess] Events of group {task} enabled by remote.

--------------------------------------------------------------  这里 开始
--------------------------------------------------------------  这里 开始
--------------------------------------------------------------  这里 开始
celeryworker_1  | [2021-05-10 06:46:41,251: INFO/Beat] beat: Starting...
celeryworker_1  | [2021-05-10 06:46:41,298: INFO/Beat] setup_schedule
celeryworker_1  | [2021-05-10 06:46:41,299: DEBUG/Beat] DatabaseScheduler: initial read
celeryworker_1  | [2021-05-10 06:46:41,299: INFO/Beat] Writing entries...
celeryworker_1  | [2021-05-10 06:46:41,300: DEBUG/Beat] DatabaseScheduler: Fetching database schedule
celeryworker_1  | [2021-05-10 06:46:41,350: DEBUG/Beat] schedule: <crontab: 0 4 * * * (m/h/d/dM/MY), Asia/Shanghai>
celeryworker_1  | [2021-05-10 06:46:41,352: DEBUG/Beat] Current schedule:
--------------------------------------------------------------
--------------------------------------------------------------
--------------------------------------------------------------  这里 结束,没有读到我的定时任务,在数据中是有写成功的

celeryworker_1  | <ModelEntry: celery.backend_cleanup celery.backend_cleanup(*[], **{}) <crontab: 0 4 * * * (m/h/d/dM/MY), Asia/Shanghai>>
celeryworker_1  | [2021-05-10 06:46:41,417: DEBUG/Beat] schedule: <crontab: 0 4 * * * (m/h/d/dM/MY), Asia/Shanghai>
celeryworker_1  | [2021-05-10 06:46:41,435: DEBUG/Beat] beat: Ticking with max interval->10.00 seconds
celeryworker_1  | [2021-05-10 06:46:41,448: DEBUG/Beat] beat: Waking up in 10.00 seconds.
celeryworker_1  | [2021-05-10 06:46:46,048: DEBUG/MainProcess] pidbox received method enable_events() [reply_to:None ticket:None]
celeryworker_1  | [2021-05-10 06:46:50,913: DEBUG/MainProcess] pidbox received method enable_events() [reply_to:None ticket:None]
celeryworker_1  | [2021-05-10 06:46:51,463: DEBUG/Beat] beat: Synchronizing schedule...
celeryworker_1  | [2021-05-10 06:46:51,479: INFO/Beat] Writing entries...
celeryworker_1  | [2021-05-10 06:46:51,533: DEBUG/Beat] beat: Waking up in 10.00 seconds.
celeryworker_1  | [2021-05-10 06:46:55,904: DEBUG/MainProcess] pidbox received method enable_events() [reply_to:None ticket:None]
celeryworker_1  | [2021-05-10 06:47:00,003: DEBUG/MainProcess] heartbeat_tick : for connection a9c0e41d02234670a4534c3b60bc8090
celeryworker_1  | [2021-05-10 06:47:00,003: DEBUG/MainProcess] heartbeat_tick : Prev sent/recv: None/None, now - 28/70, monotonic - 15271.733849645, last_heartbeat_sent - 15271.733847792, heartbeat int. - 60 for connection a9c0e41d02234670a4534c3b60bc8090
celeryworker_1  | [2021-05-10 06:47:00,890: DEBUG/MainProcess] pidbox received method enable_events() [reply_to:None ticket:None]
celeryworker_1  | [2021-05-10 06:47:01,547: DEBUG/Beat] beat: Waking up in 10.00 seconds.
celeryworker_1  | [2021-05-10 06:47:05,902: DEBUG/MainProcess] pidbox received method enable_events() [reply_to:None ticket:None]
celeryworker_1  | [2021-05-10 06:47:10,904: DEBUG/MainProcess] pidbox received method enable_events() [reply_to:None ticket:None]
celeryworker_1  | [2021-05-10 06:47:11,560: DEBUG/Beat] beat: Waking up in 10.00 seconds.
celeryworker_1  | [2021-05-10 06:47:15,902: DEBUG/MainProcess] pidbox received method enable_events() [reply_to:None ticket:None]
celeryworker_1  | [2021-05-10 06:47:20,007: DEBUG/MainProcess] heartbeat_tick : for connection a9c0e41d02234670a4534c3b60bc8090
celeryworker_1  | [2021-05-10 06:47:20,008: DEBUG/MainProcess] heartbeat_tick : Prev sent/recv: 28/70, now - 28/113, monotonic - 15291.738308944, last_heartbeat_sent - 15271.733847792, heartbeat int. - 60 for connection a9c0e41d02234670a4534c3b60bc8090

个人尝试:

更换为sqlite数据库的方式,是可以取到任务的,
更换为postgreq 9.6,同样取不到任务。

目标:

celery beat 在postgreq 12版本下,能够读取到任务。谢谢
gmorkgg commented 3 years ago

已经搞定了。

解决办法,修改两处 1、启动命令 celery worker -A app.worker -Q main-queue -B -l debug 2、 代码部分修改了两处: 一、

if not query_aws_ec2_schedule:
    session.add(schedule_aws_ec2)
    session.add(periodic_task_aws_ec2)
    session.commit()
    # session.close()

二、定义了 beat_scheduler,内容为单引号,同时config里增加了 beat_scheduler和beat_dburi的配置

beat_scheduler = 'celery_sqlalchemy_scheduler.schedulers:DatabaseScheduler'
config = {
    #'beat_schedule': beat_schedule,
    'beat_scheduler': beat_scheduler,  # 命令行传参配置了,所以这里并不需要写死在代码里
    'beat_max_loop_interval': beat_max_loop_interval,
    'beat_dburi': beat_dburi,
    'timezone': timezone,
    'worker_max_tasks_per_child': worker_max_tasks_per_child
}