ydf0509 / celery_demo

演示复杂深层路径,完全不按照一般套路的目录格式的celery使用
60 stars 11 forks source link

celery复杂层级结构引用 #1

Open pengqiaojun opened 7 months ago

pengqiaojun commented 7 months ago

你好,参考你的示例,我也有两个复杂层级的项目,需要同时使用celery注册任务,但是根据你的demo,我并没有启动成功,两个项目结构如下: /var/www/ |—— pro1 |———|pro1 |————— celery.py |————— tasks.py |———pro2 |————|project |——————|async_task |————————tasks.py 在我使用命令行启动时 cd /var/www/ && celery worker -A pro1.pro1.celery -c 128 -l info -P gevent ,无法启动。

ydf0509 commented 7 months ago

需要配置的,任何无规则文件夹100层级不规则名字都可以。你都不发你的配置内容,我咋知道

ydf0509 commented 7 months ago

如果你不会配置celery,可以使用funboost自动配置和操作celery框架

ydf0509 commented 7 months ago

你完全没看 这吧,是任意文件夹都可以,但是也要配置的. image

pengqiaojun commented 7 months ago

@ydf0509 嗨,非常感谢你的回复。昨晚写的太匆忙,我详细介绍一下。 我这有两个独立的django项目,pro1和pro2,在pro1项目目录中已经配置并成功启动了celery。现在是希望在pro2的项目中构建一个新的tasks文件,让pro1中的celery可以跨项目include。目的是希望在pro2中使用func.delay()的方式向celery中发送执行任务。两个项目中的task都使用app.task装饰器方式。下面是具体的项目目录结构和配置信息。 image

/var/www/pro1/pro1/celery.py `

!/usr/bin/python

from future import absolute_import, unicode_literals import sys

from celery import Celery, platforms from celery.schedules import crontab from datetime import timedelta

platforms.C_FORCE_ROOT = True

app = Celery(main='kedong', include=['pro1.pro1.tasks', 'pro2.project.async_task.tasks']) celerybeat_schedule = { 'freemem': { 'task': 'kdpa.tasks.crond_freemem', 'schedule': crontab(minute=0, hour='/6') } } app.conf.update( CELERYBEAT_SCHEDULE=celerybeat_schedule, # 计划任务 CELERY_ACCEPT_CONTENT=['application/json', ], # 消息格式 CELERY_ENABLE_UTC=True, CELERY_TIMEZONE='Asia/Shanghai', # 时区设置使 BROKER_URL='redis://{}:{}/{}'.format('127.0.0.1', 6379, 5), # 消息队列 BROKER_POOL_LIMIT=1200, # 针对消息队列的连接处最大打开数,需要大于worker_concurrency数量 BROKER_TRANSPORT_OPTIONS={ 'max_connections': 600, 'visibility_timeout': 3600, }, CELERY_RESULT_BACKEND='redis://{}:{}/{}'.format('127.0.0.1', 6379, 5), # 执行结果 CELERY_TASK_RESULT_EXPIRES=60 15, # 执行结果有效期 CELERY_RESULT_SERIALIZER='json', CELERY_TASK_IGNORE_RESULT=True, CELERY_TASK_SERIALIZER='json', CELERY_TRACK_STARTED=True, # 任务增加started状态

CELERYD_CONCURRENCY=1000, # 并发worker数

CELERYD_HIJACK_ROOT_LOGGER=False,   # 支持日志格式重定义
CELERYD_MAX_TASKS_PER_CHILD=1,      # 每个worker最多执行完1个任务就会被销毁,可防止内存泄露
CELERYD_PREFETCH_MULTIPLIER=1,
CELERYD_FORCE_EXECV=True,           # 预防死锁

) /var/www/pro1/pro1/tasks.py

!/usr/bin/python

from future import absolute_import

import os import subprocess import sys import time

@app.task def crond_get_controller_resource(): try: resource = Resource() resource.set_resource_redis() except Exception as e: cLogger.ERROR(e) /var/www/pro2/project/async_task/tasks.py

!/usr/bin/python3

from celery import Celery

app = Celery(broker="redis://127.0.0.1:6379/10",)

@app.task def pro2_task_test(): print('This is pro2 task.') `

使用celery官方推荐的命令行启动 cd /var/www/ celery -A pro1.pro1.celery worker -c 20 -l info -P gevent 以上的配置和启动方式结合起来,celery是无法启动的,我感觉还是我的项目层级关系和引用没有搞对。

ydf0509 commented 7 months ago

没启动起来是什么意思,截个图吧报错

pengqiaojun commented 7 months ago

@ydf0509 image

ydf0509 commented 7 months ago

加个pythonpath实时,我没代码,要么你用funboost,