s3rius / FastAPI-template

Feature rich robust FastAPI template.
MIT License
1.95k stars 172 forks source link

taskiq scheduler does not run ... #219

Closed rcholic closed 2 months ago

rcholic commented 2 months ago

I followed the documentation for Taskiq here to set up scheduler in my tkq.py file, like following:

result_backend = RedisAsyncResultBackend(
    redis_url=str(settings.redis_url.with_path("/1")),
)
broker = ListQueueBroker(
    str(settings.redis_url.with_path("/1")),
).with_result_backend(result_backend)

scheduler = TaskiqScheduler(broker=broker, sources=[LabelScheduleSource(broker)])

And I created an example task:

@broker.task(schedule=[{"cron": "*/1 * * * *", "cron_offset": None, "time": None, "args": [10], "kwargs": {}, "labels": {}}])
async def heavy_task(a: int) -> int:
    if broker.is_worker_process:
        logger.info("heavy_task: {} is in worker process!!!", a)
    else:
        logger.info("heavy_task: {} NOT in worker process", a)

    return 100 + a

In the docker-compose.yml file, I start the broker and scheduler like so:


  taskiq-worker:
    <<: *main_app
    labels: []
    command:
    - taskiq
    - worker
    - market_insights.tkq:scheduler && market_insights.tkq:broker

However, the taskiq scheduler does not seem to do anything. I guess I must be missing something. Can some experts help? Thanks

s3rius commented 2 months ago

The command is incorrect. Because scheduler should be started with taskiq scheduler command. Try this config:

  taskiq-scheduler:
    <<: *main_app
    labels: []
    command:
    - taskiq
    - scheduler
    - -fsd
    - market_insights.tkq:scheduler

  taskiq-worker:
    <<: *main_app
    labels: []
    command:
    - taskiq
    - worker
    - -fsd
    -  market_insights.tkq:broker
rcholic commented 2 months ago
taskiq-scheduler:
    <<: *main_app
    labels: []
    command:
    - taskiq
    - scheduler
    - -fsd
    - market_insights.tkq:scheduler

Thanks for your answer, but my scheduler does not run with the updates made to my docker-compose.yaml file. The heavy_task does not get triggered:

@broker.task(schedule=[{"cron": "*/1 * * * *", "cron_offset": None, "time": None, "args": [10], "kwargs": {}, "labels": {}}])
async def heavy_task(a: int) -> int:
    if broker.is_worker_process:
        logger.info("heavy_task: {} is in worker process!!!", a)
    else:
        logger.info("heavy_task: {} NOT in worker process", a)

    return 100 + a
rcholic commented 2 months ago

@s3rius After many tries, my taskiq scheduler seems to be running:

taskiq-scheduler-1 | [2024-07-20 14:02:00,036][INFO ][run:run_scheduler:217] First run skipped. The scheduler is now running.

But the output of the task is not generated, so I was wondering how to verify the task is actually running. any suggestions? Thanks