tortoise / tortoise-orm

Familiar asyncio ORM for python, built with relations in mind
https://tortoise.github.io
Apache License 2.0
4.39k stars 357 forks source link

tortoise中使用celery #1524

Open 202141903513 opened 6 months ago

202141903513 commented 6 months ago

tortoise支持celery吗?

douyahu commented 6 months ago

我用了芹菜,本身是不支持,我通过修改代码,使其支持,例如:

from celery import Celery, platforms

platforms.C_FORCE_ROOT = True
celery_app = Celery("celery_worker")
celery_app.config_from_object('backend.celery.celery_config', namespace='celery')

async def register_mysql_fastapi(app: FastAPI):
    # 注册数据库
    register_tortoise(
        app,
        config=DB_ORM_CONFIG,
        generate_schemas=os.getenv('INIT_DB', False),
        add_exception_handlers=False,
    )

async def register_mysql_to_script():
    # 注册数据库供脚本调用
    await Tortoise.init(config=DB_ORM_CONFIG)

@celery_app.task(name='scan_code_task', serializer='json')
def scan_code_task(uuid):
    async def run_task():
        await register_mysql_to_script()
        await scan_code(uuid)
    loop = asyncio.new_event_loop()  # 创建新的事件循环
    asyncio.set_event_loop(loop)  # 设置事件循环为当前线程的事件循环
    loop.run_until_complete(run_task())

希望这能帮到你
DongdongWu001 commented 5 months ago

我用了芹菜,本身是不支持,我通过修改代码,使其支持,例如:

from celery import Celery, platforms

platforms.C_FORCE_ROOT = True
celery_app = Celery("celery_worker")
celery_app.config_from_object('backend.celery.celery_config', namespace='celery')

async def register_mysql_fastapi(app: FastAPI):
    # 注册数据库
    register_tortoise(
        app,
        config=DB_ORM_CONFIG,
        generate_schemas=os.getenv('INIT_DB', False),
        add_exception_handlers=False,
    )

async def register_mysql_to_script():
    # 注册数据库供脚本调用
    await Tortoise.init(config=DB_ORM_CONFIG)

@celery_app.task(name='scan_code_task', serializer='json')
def scan_code_task(uuid):
    async def run_task():
        await register_mysql_to_script()
        await scan_code(uuid)
    loop = asyncio.new_event_loop()  # 创建新的事件循环
    asyncio.set_event_loop(loop)  # 设置事件循环为当前线程的事件循环
    loop.run_until_complete(run_task())

希望这能帮到你

app: FastAPI这个是怎么传进去的?

douyahu commented 5 months ago

我用了芹菜,本身是不支持,我通过修改代码,使其支持,例如:

from celery import Celery, platforms

platforms.C_FORCE_ROOT = True
celery_app = Celery("celery_worker")
celery_app.config_from_object('backend.celery.celery_config', namespace='celery')

async def register_mysql_fastapi(app: FastAPI):
    # 注册数据库
    register_tortoise(
        app,
        config=DB_ORM_CONFIG,
        generate_schemas=os.getenv('INIT_DB', False),
        add_exception_handlers=False,
    )

async def register_mysql_to_script():
    # 注册数据库供脚本调用
    await Tortoise.init(config=DB_ORM_CONFIG)

@celery_app.task(name='scan_code_task', serializer='json')
def scan_code_task(uuid):
    async def run_task():
        await register_mysql_to_script()
        await scan_code(uuid)
    loop = asyncio.new_event_loop()  # 创建新的事件循环
    asyncio.set_event_loop(loop)  # 设置事件循环为当前线程的事件循环
    loop.run_until_complete(run_task())

希望这能帮到你

app: FastAPI这个是怎么传进去的?

你说的传进去指的是使用orm嘛?

DongdongWu001 commented 5 months ago

我用了芹菜,本身是不支持,我通过修改代码,使其支持,例如:

from celery import Celery, platforms

platforms.C_FORCE_ROOT = True
celery_app = Celery("celery_worker")
celery_app.config_from_object('backend.celery.celery_config', namespace='celery')

async def register_mysql_fastapi(app: FastAPI):
    # 注册数据库
    register_tortoise(
        app,
        config=DB_ORM_CONFIG,
        generate_schemas=os.getenv('INIT_DB', False),
        add_exception_handlers=False,
    )

async def register_mysql_to_script():
    # 注册数据库供脚本调用
    await Tortoise.init(config=DB_ORM_CONFIG)

@celery_app.task(name='scan_code_task', serializer='json')
def scan_code_task(uuid):
    async def run_task():
        await register_mysql_to_script()
        await scan_code(uuid)
    loop = asyncio.new_event_loop()  # 创建新的事件循环
    asyncio.set_event_loop(loop)  # 设置事件循环为当前线程的事件循环
    loop.run_until_complete(run_task())

希望这能帮到你

app: FastAPI这个是怎么传进去的?

你说的传进去指的是使用orm嘛?

async def register_mysql_to_script():

注册数据库供脚本调用

await Tortoise.init(config=database_config)

@celery_app.task(name='task', serializer='json') def process_messaging_queue(result_mysql): try: async def run_async_code(): try:

初始化数据库连接

await register_mysql_to_script()

在这里数据库插入操作

await mysql_create(result_mysql) except Exception as e: logger.error(e) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(run_async_code()) except Exception as e: logger.error(e)

你好,我按照上述方法修改,提示我数据库连接失败,我确定我的数据库配置文件没问题,就很奇怪。 Can't connect to MySQL server: {'host': 'localhost', 'port': 3306...} celery的任务函数中,可以这样初始化数据库的连接吗,我用的是tortoise-orm

douyahu commented 5 months ago

我用了芹菜,本身是不支持,我通过修改代码,使其支持,例如:

from celery import Celery, platforms

platforms.C_FORCE_ROOT = True
celery_app = Celery("celery_worker")
celery_app.config_from_object('backend.celery.celery_config', namespace='celery')

async def register_mysql_fastapi(app: FastAPI):
    # 注册数据库
    register_tortoise(
        app,
        config=DB_ORM_CONFIG,
        generate_schemas=os.getenv('INIT_DB', False),
        add_exception_handlers=False,
    )

async def register_mysql_to_script():
    # 注册数据库供脚本调用
    await Tortoise.init(config=DB_ORM_CONFIG)

@celery_app.task(name='scan_code_task', serializer='json')
def scan_code_task(uuid):
    async def run_task():
        await register_mysql_to_script()
        await scan_code(uuid)
    loop = asyncio.new_event_loop()  # 创建新的事件循环
    asyncio.set_event_loop(loop)  # 设置事件循环为当前线程的事件循环
    loop.run_until_complete(run_task())

希望这能帮到你

app: FastAPI这个是怎么传进去的?

你说的传进去指的是使用orm嘛?

async def register_mysql_to_script(): ###注册数据库供脚本调用 await Tortoise.init(config=database_config)

@celery_app.task(name='task', serializer='json') def process_messaging_queue(result_mysql): try: async def run_async_code(): try: ###初始化数据库连接 await register_mysql_to_script() ###在这里数据库插入操作 await mysql_create(result_mysql) except Exception as e: logger.error(e) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(run_async_code()) except Exception as e: logger.error(e)

你好,我按照上述方法修改,提示我数据库连接失败,我确定我的数据库配置文件没问题,就很奇怪。 Can't connect to MySQL server: {'host': 'localhost', 'port': 3306...} celery的任务函数中,可以这样初始化数据库的连接吗,我用的是tortoise-orm

image

image image image

DongdongWu001 commented 5 months ago

我用了芹菜,本身是不支持,我通过修改代码,使其支持,例如:

from celery import Celery, platforms

platforms.C_FORCE_ROOT = True
celery_app = Celery("celery_worker")
celery_app.config_from_object('backend.celery.celery_config', namespace='celery')

async def register_mysql_fastapi(app: FastAPI):
    # 注册数据库
    register_tortoise(
        app,
        config=DB_ORM_CONFIG,
        generate_schemas=os.getenv('INIT_DB', False),
        add_exception_handlers=False,
    )

async def register_mysql_to_script():
    # 注册数据库供脚本调用
    await Tortoise.init(config=DB_ORM_CONFIG)

@celery_app.task(name='scan_code_task', serializer='json')
def scan_code_task(uuid):
    async def run_task():
        await register_mysql_to_script()
        await scan_code(uuid)
    loop = asyncio.new_event_loop()  # 创建新的事件循环
    asyncio.set_event_loop(loop)  # 设置事件循环为当前线程的事件循环
    loop.run_until_complete(run_task())

希望这能帮到你

app: FastAPI这个是怎么传进去的?

你说的传进去指的是使用orm嘛?

async def register_mysql_to_script(): ###注册数据库供脚本调用 await Tortoise.init(config=database_config) @celery_app.task(name='task', serializer='json') def process_messaging_queue(result_mysql): try: async def run_async_code(): try: ###初始化数据库连接 await register_mysql_to_script() ###在这里数据库插入操作 await mysql_create(result_mysql) except Exception as e: logger.error(e) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(run_async_code()) except Exception as e: logger.error(e) 你好,我按照上述方法修改,提示我数据库连接失败,我确定我的数据库配置文件没问题,就很奇怪。 Can't connect to MySQL server: {'host': 'localhost', 'port': 3306...} celery的任务函数中,可以这样初始化数据库的连接吗,我用的是tortoise-orm

image

image image image

已解决,谢谢。服务请求频繁的时候,是否需要把数据库断开写入celery的任务函数

waketzheng commented 3 months ago

写了个简单的Demo,有需要的可以参考一下:

#!/usr/bin/env python
import shlex
import subprocess
from pathlib import Path

import anyio
from celery import Celery
from celery.signals import worker_process_init, worker_process_shutdown
from tortoise import Tortoise

from config import DB_CONFIG
from models import Users

REDIS_URL = "redis://localhost:6379"
app = Celery(__name__, broker=REDIS_URL, backend=REDIS_URL)

async def init_db() -> None:
    """初始化数据库连接"""
    await Tortoise.init(db_url=DB_CONFIG["db_url"], modules=DB_CONFIG["modules"])

@worker_process_init.connect
def init_worker(**kwargs):
    anyio.run(init_db)

@worker_process_shutdown.connect
def close_worker(**kwargs):
    anyio.run(Tortoise.close_connections)

async def _create_user(data) -> int:
    user_obj = await Users.create(**data)
    return user_obj.id

@app.task
def save_user_to_db(data) -> int:
    return anyio.run(_create_user, data)

def main():
    subprocess.run(shlex.split(f"celery -A {Path(__file__).stem} worker"))

if __name__ == "__main__":
    main()
zyxbcde commented 2 months ago

这样应该可以,不过用这个orm最好别用celery,我们公司现在用的是darq,跑了两年还算稳定。

async def do_something():
    await Tortoise.init(config=DB_ORM_CONFIG)
    print(await MyModel.all())

@celery_app.task(name='scan_code_task', serializer='json')
def scan_code_task():
    run_async(do_something())