taskiq-python / taskiq

Distributed task queue with full async support
MIT License
780 stars 48 forks source link

Cannot use `schedule_by_time` with FastAPI app #347

Closed mmzeynalli closed 1 week ago

mmzeynalli commented 3 weeks ago

Basically, I need to implement apply_async of celery (with eta argument), knowing when I need to call the task. However, I struggled to make it work. Here is my example:

# broker.py

import taskiq_fastapi
from taskiq import TaskiqScheduler
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend, RedisScheduleSource

from settings import settings

broker = ListQueueBroker(url=settings.REDIS_URL).with_result_backend(
    RedisAsyncResultBackend(redis_url=settings.REDIS_URL)
)

redis_source = RedisScheduleSource(settings.REDIS_URL)
scheduler = TaskiqScheduler(broker, sources=[redis_source])

taskiq_fastapi.init(broker, 'main:app')
# lifetime.py

import asyncio

from fastapi import FastAPI
from redis import asyncio as aioredis

import redisdb
from app.ocpp.handler.events import process_event
from broker import broker, redis_source
from core.queue.consumer import consume
from settings import rabbitmq_settings, settings

tasks = set()

async def startup_taskiq() -> None:
    if not broker.is_worker_process:
        await broker.startup()

    await redis_source.startup()

async def shutdown_taskiq() -> None:
    if not broker.is_worker_process:
        await broker.shutdown()

    await redis_source.shutdown()

async def setup_redis() -> None:
    # Redis
    pool = aioredis.ConnectionPool.from_url(
        settings.REDIS_URL,
        max_connections=10,
        decode_responses=True,
    )
    redisdb.redis_client = aioredis.Redis(connection_pool=pool)

async def shutdown_redis() -> None:
    await redisdb.redis_client.close()

async def setup_asyncio() -> None:
    task = asyncio.create_task(
        consume(queue_name=rabbitmq_settings.EVENT_EXCHANGE_NAME, on_message=process_event)
    )
    tasks.add(task)

    def _on_completion(f):
        tasks.remove(f)

    task.add_done_callback(_on_completion)

def startup(app: FastAPI):
    async def _startup():
        await setup_asyncio()
        await startup_taskiq()
        await setup_redis()

    return _startup

def shutdown(app: FastAPI):
    async def _shutdown():
        await shutdown_taskiq()
        await shutdown_redis()

    return _shutdown
# tasks.py

from app.ocpp.handler.v16.commands import send_remote_stop_transaction
from app.ocpp.models.transaction import CPTransaction
from broker import broker
from core.enums.status import CPTransactionStatus
from database import get_contextual_session

@broker.task(task_name='stop-cptransaction-task')
async def stop_cptransaction_task(id: int):
    print('HEREEEEE!!!!')
    async with get_contextual_session() as session:
        cptransaction = await CPTransaction.get_or_none(
            session,
            id=id,
            status=CPTransactionStatus.in_progress,
        )

        if cptransaction:
            await send_remote_stop_transaction(cptransaction)

When I call schedule_by_time, it is registered in redis and is in the queue with correct data. However, when time comes, it never executes. From documentation, I understood that I need to run taskiq scheduler module:scheduler, but, I am not using Taskiq standalone, it is part of FastAPI. Do I need to create new docker container for Taskiq and run that command there? Or could it be integrated with FastAPI?

Sobes76rus commented 3 weeks ago

If u want to run tasks on the FastAPI server, u can start receiver with run_receiver_task from taskiq/api/receiver there is also a scheduler

mmzeynalli commented 3 weeks ago

I see. Is it recommended to use either worker or scheduler with this API? At least in worker's api file, it is not that recommended to use it programmatically, however, nothing is mentioned in scheduler's api file.

s3rius commented 3 weeks ago

I'd not recommend it to be run from code neither. Because in fastapi you can run multiple uvicorn workers, which will result in multiple schedulers running at the same time.

francipvb commented 1 week ago

Hello,

You have to run worker and scheduler processes separately.

Anyway, you are using redis to enqueue your tasks so you can run another container/process for each part. This is to be able to scale these processes individually.

mmzeynalli commented 1 week ago

Yep, got it. I was just wondering, if it would be possible to run taskiq in the same docker as fastapi.