greyli / helloflask

Hello, Flask!
https://docs.helloflask.com
MIT License
1.87k stars 2.53k forks source link

请教:Celery在Flask的相关用法 #119

Closed BobcatsII closed 5 years ago

BobcatsII commented 5 years ago

打扰您了,上周末两天遇到celery的相关问题一直卡着过不去,google搜索和官方文档相关的都看了,大体上晓得在单个文件中该怎么写,但是在我写的网站(套用您的albumy的壳子,页面地址:https://github.com/BobcatsII/Opps.git ),不知道该把celery的配置放在哪里,官方文档和google查的例子都是放在同一个文件的app.py内或者是将extensions.py和init.py融合在一起的文件内,我的网站内的celery配置在如下文件中,方便您查看:[ init.py,extensions.py,settings.py,新增tasks.py,路由blueprint/deploy.py ],导致我在用celery命令(celery -A opps.tasks.celery worker -l info)启动的时候,加载的celery连接rabbitmq总是默认连接 “[2018-12-24 15:17:12,299: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//”,不是我settings.py里面设定的连接,而且启动后我在页面点击"开始部署"的时候,celery进程提示我"程序上下文"有问题,虽然按照官方文档给的样例写了上去,我感觉我应该是对 extensions.py和init.py这两个文件的理解有问题,还请您抽空能指点一下我问题出在哪里,谢谢您!!

greyli commented 5 years ago

你好,看了你的代码,配置和上下文错误是因为你注册任务的 celery 对象和加载配置和上下文的 celery 对象不是同一个。

你注册任务时是从 extensions.py 导入的 celery,这个 celery 对象什么也没做。而真正加载了配置,设置了上下文的 celery 对象是你在 __init__.py 文件的 make_celery() 函数返回的那个,但是因为被放到了工厂函数里,根本没法引用。

大概有这些改进建议:

示例代码如下:

from flask import Flask
from celery import Celery
from celeryconfig import broker_url  # 你可以从你的配置文件里读取那两个变量

celery = Celery(__name__, broker=broker_url)  # 也可以直接把配置的值写到这里

def create_app(config_name=None):
    # ...
    app = Flask(__name__)
    app.config.from_object(config[config_name])

    register_celery(app)
    # ...
    return app

def register_celery(app):
    celery.config_from_object('celeryconfig')  # 这里可以沿用你的配置加载代码

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask

补充一下,在 Stack Overflow 看到另一种实现方式(link),可以为 Celery 也创建一个工厂函数:

#factory.py
from celery import Celery
from config import config

def create_celery_app(app=None):
    app = app or create_app(config)
    celery = Celery(__name__, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

在 tasks.py 调用工厂函数创建 celery 对象:

#tasks.py

from factory import create_celery_app

celery = create_celery_app()

@celery.task
def do_something():
    pass
BobcatsII commented 5 years ago

好的,我明白了,非常感谢您的指导~谢谢

BobcatsII commented 5 years ago

我两种方式都试试,哈哈

BobcatsII commented 5 years ago

比对了一下,最终选择了第二种方法,给Celery单独创建工厂函数,这样文件很规整,但是出现了一个新的异常,跟初始化有关系,我一开始选择Flask-Celery-Helper这个模块的时候,就是因为 Celery 没有 init_app() 方法,而Flask-Celery-Helper是有 init_app() 的方法,所以混在一起使用了。 现在的状态是,ansible_deploy.delay()是已经执行,并加入队列,java进程也部署好了,就卡在这里了,报错init_app(),部署状态也没有更新成"完成部署"。

Celery的异常日志如下:

[2018-12-25 11:53:58,533: INFO/MainProcess] Connected to amqp://linan:**@127.0.0.1:5672/linanhost
[2018-12-25 11:53:58,550: INFO/MainProcess] mingle: searching for neighbors
[2018-12-25 11:53:59,566: INFO/MainProcess] mingle: all alone
[2018-12-25 11:53:59,588: WARNING/MainProcess] celery@test ready.
[2018-12-25 11:54:19,160: INFO/MainProcess] Received task: opps.tasks.ansible_deploy[572aadd4-a24a-4cc3-9052-0647e503c821]
[2018-12-25 11:54:52,991: ERROR/MainProcess] Task opps.tasks.ansible_deploy[572aadd4-a24a-4cc3-9052-0647e503c821] raised unexpected: AssertionError('The sqlalchemy extension was not registered to the current application.  Please make sure to call init_app() first.',)
Traceback (most recent call last):
  File "/root/.local/virtualenvs/2opps-19phkSFA/lib/python3.6/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/2opps/opps/celery_factory.py", line 24, in __call__
    return TaskBase.__call__(self, *args, **kwargs)
  File "/root/.local/virtualenvs/2opps-19phkSFA/lib/python3.6/site-packages/celery/app/trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "/usr/local/2opps/opps/tasks.py", line 23, in ansible_deploy
    sql = DeployLog.query.get(deploy_id)
  File "/root/.local/virtualenvs/2opps-19phkSFA/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 514, in __get__
    return type.query_class(mapper, session=self.sa.session())
  File "/root/.local/virtualenvs/2opps-19phkSFA/lib/python3.6/site-packages/sqlalchemy/orm/scoping.py", line 74, in __call__
    return self.registry()
  File "/root/.local/virtualenvs/2opps-19phkSFA/lib/python3.6/site-packages/sqlalchemy/util/_collections.py", line 1001, in __call__
    return self.registry.setdefault(key, self.createfunc())
  File "/root/.local/virtualenvs/2opps-19phkSFA/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 3010, in __call__
    return self.class_(**local_kw)
  File "/root/.local/virtualenvs/2opps-19phkSFA/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 143, in __init__
    bind = options.pop('bind', None) or db.engine
  File "/root/.local/virtualenvs/2opps-19phkSFA/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 877, in engine
    return self.get_engine()
  File "/root/.local/virtualenvs/2opps-19phkSFA/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 887, in get_engine
    state = get_state(app)
  File "/root/.local/virtualenvs/2opps-19phkSFA/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 570, in get_state
    'The sqlalchemy extension was not registered to the current ' \
AssertionError: The sqlalchemy extension was not registered to the current application.  Please make sure to call init_app() first.
BobcatsII commented 5 years ago

Celery的既然没有,要自己去写吗? 我看Stack Overflow里面有个帖子: https://stackoverflow.com/questions/47223704/celery-using-default-broker-instead-of-reddis-flask-celery-factory-pattern 里面有一段:

#app/extensions.py
import flask
from celery import Celery

class FlaskCelery(Celery):
    def __init__(self, *args, **kwargs):
        super(FlaskCelery, self).__init__(*args, **kwargs)
        self.patch_task()
        if 'app' in kwargs:
            self.init_app(kwargs['app'])

    def patch_task(self):
        TaskBase = self.Task
        _celery = self
        class ContextTask(TaskBase):
            abstract = True
            def __call__(self, *args, **kwargs):
                if flask.has_app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
                else:
                    with _celery.app.app_context():
                        return TaskBase.__call__(self, *args, **kwargs)
        self.Task = ContextTask

    def init_app(self, app):
        self.app = app
        self.config_from_object(app.config)

celery = FlaskCelery()

——————————————————————————

#app/__init__.py
...
from app.extensions import celery
...

def create_app(config_name):
    register_extensions(app)
    ...
    return app

def register_extensions(app):
    ...
    celery.init_app(app)
    ...

我先试试看

BobcatsII commented 5 years ago

试了一下,Celery还是报一样的错误,今天值得欣慰的是部署的java服务是完成了,总算是过了不能执行的坎^_^,下面是部署的日志:

#logs/deploy/40.log
开始执行部署
[DEPRECATION WARNING]: DEFAULT_SUDO_USER option, In favor of Ansible Become, which is a generic 
framework. See become_user. , use become instead. This feature will be removed in version 2.8. 
Deprecation warnings can be disabled by setting deprecation_warnings=False in ansible.cfg.

PLAY [192.168.227.129] ***************************************************************************

TASK [Gathering Facts] ***************************************************************************
ok: [192.168.227.129]

TASK [检查程序端口] ************************************************************************************
changed: [192.168.227.129]

TASK [检查代码目录是否存在] ********************************************************************************
ok: [192.168.227.129]

TASK [尝试停止当前版本APP服务] *****************************************************************************
fatal: [192.168.227.129]: FAILED! => {"censored": "the output has been hidden due to the fact that 'no_log: true' was specified for this result", "changed": true}
...ignoring

TASK [检查程序是否停止] **********************************************************************************
changed: [192.168.227.129]

TASK [拷贝新程序到服务器] *********************************************************************************
changed: [192.168.227.129]

TASK [拷贝软链接] *************************************************************************************
changed: [192.168.227.129]

TASK [启动服务] **************************************************************************************
changed: [192.168.227.129]

TASK [检查程序启动结果] **********************************************************************************
ok: [192.168.227.129]

TASK [启动成功,添加版本号] ********************************************************************************
changed: [192.168.227.129]

PLAY RECAP ***************************************************************************************
192.168.227.129            : ok=10   changed=7    unreachable=0    failed=0   
部署成功

但是,"部署状态" 没有更新新的到数据库中,通过这个可以确定 tasks.py 里面代码只执行到了 “task=...”:

@celery.task()
def ansible_deploy(type, project, version, ip, deploy_id, deploy_date):
    script_path = current_app.config['DEPLOY_DIR'] + '/prod_vms-deploy.sh'
    sql = DeployLog.query.get(deploy_id)
    task = subprocess.getstatusoutput('bash {0} {1} {2} {3} {4} {5} > {6}/deploy/{7}.log 2>&1'.format(script_path,project,version,ip,type,deploy_date,current_app.config['DEPLOY_LOGS_DIR'],deploy_id))        
    #只执行到这里
    if task[0] == 0:       #从这开始的代码没被执行,感觉像是队列进去了没有出来。
        sql.dply_stat = '部署成功'
    else:
        sql.dply_stat = '部署失败'
    db.session.commit()
    return task
greyli commented 5 years ago

报错提示 Flask-SQALchemy 没有初始化。看代码是因为你在 Celery 里新创建了程序实例,这个程序实例没有注册扩展。建议尝试用你在构造文件里写的 create_app() 函数创建程序实例,或是采用第一种实现方式。

BobcatsII commented 5 years ago

我明白您的意思了,我一直都想错了,纠结在init_app()上面了,(敲脑袋 ~; 在新添加的celery_factory.py文件里面Celery工厂函数添加了几行, 将 extensions.py 里面的 sqlalchemy 引入进来就可以了,代码如下:

#celery_factory.py
...
from opps.extensions import db   #引入sqlalchemy
...

def create_celery_app(config_name=None):
    ...
    app.config.from_object(config[config_name])
    register_celery_db(app)    #注册扩展
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'], ... ....)
    ....
    return celery

def register_celery_db(app):   #注册函数
    db.init_app(app)   

到页面上重新部署了一下应用,下面是celery的日志:

[tasks]
  . opps.tasks.ansible_deploy

[2018-12-26 16:13:58,751: INFO/MainProcess] Connected to amqp://linan:**@127.0.0.1:5672/linanhost
[2018-12-26 16:13:58,776: INFO/MainProcess] mingle: searching for neighbors
[2018-12-26 16:13:59,796: INFO/MainProcess] mingle: all alone
[2018-12-26 16:13:59,821: WARNING/MainProcess] celery@test ready.
[2018-12-26 16:14:56,303: INFO/MainProcess] Received task: opps.tasks.ansible_deploy[0aa50404-4a0e-453d-b526-098dd3ba7a2b]
[2018-12-26 16:15:00,945: INFO/MainProcess] Task opps.tasks.ansible_deploy[0aa50404-4a0e-453d-b526-098dd3ba7a2b] succeeded in 4.6372282299998915s: (1, '')
[2018-12-26 16:18:30,277: INFO/MainProcess] Received task: opps.tasks.ansible_deploy[15cb9477-da01-4d05-8ee2-51dbea94fae2]
[2018-12-26 16:19:07,895: INFO/MainProcess] Task opps.tasks.ansible_deploy[15cb9477-da01-4d05-8ee2-51dbea94fae2] succeeded in 37.61557996299962s: (0, '')

Surprise!!成功了! (等web都弄完了,我再尝试您的第一种方式,:P) 卡了四天的异步总算通过了,谢谢您!!总算可以继续往下走了,感谢您的指点! You are my savior! Year~

greyli commented 5 years ago

不客气 :)