s3rius / FastAPI-template

Feature rich robust FastAPI template.
MIT License
1.9k stars 164 forks source link

taskiq scheduler issue: same task gets scheduled so many times #221

Closed rcholic closed 1 month ago

rcholic commented 1 month ago

This is the output of the taskiq scheduler:


taskiq-worker-1     | [2024-07-20 21:13:00,640][taskiq.process-manager][INFO   ][MainProcess] Scheduled workers reload.
taskiq-worker-1     | [2024-07-20 21:13:00,648][taskiq.process-manager][INFO   ][MainProcess] Scheduled workers reload.
taskiq-worker-1     | [2024-07-20 21:13:00,674][taskiq.process-manager][INFO   ][MainProcess] Scheduled workers reload.
taskiq-worker-1     | [2024-07-20 21:13:00,688][taskiq.process-manager][INFO   ][MainProcess] Scheduled workers reload.
taskiq-worker-1     | [2024-07-20 21:13:00,695][taskiq.process-manager][INFO   ][MainProcess] Scheduled workers reload.
taskiq-worker-1     | [2024-07-20 21:13:00,706][taskiq.process-manager][INFO   ][MainProcess] Scheduled workers reload.
taskiq-worker-1     | [2024-07-20 21:13:00,713][taskiq.process-manager][INFO   ][MainProcess] Scheduled workers reload.
taskiq-worker-1     | [2024-07-20 21:13:00,720][taskiq.process-manager][INFO   ][MainProcess] Scheduled workers reload.
taskiq-worker-1     | [2024-07-20 21:13:00,729][taskiq.process-manager][INFO   ][MainProcess] Scheduled workers reload.
taskiq-worker-1     | [2024-07-20 21:13:00,741][taskiq.process-manager][INFO   ][MainProcess] Scheduled workers reload.

my code setup:


result_backend = RedisAsyncResultBackend(  # type: ignore
    redis_url=str(settings.redis_url.with_path("/0")),
)
broker = ListQueueBroker(
    str(settings.redis_url.with_path("/0")),
).with_result_backend(result_backend)

label_source = LabelScheduleSource(broker)
redis_source = RedisScheduleSource(str(settings.redis_url.with_path("/0")))
scheduler = TaskiqScheduler(broker=broker, sources=[redis_source])

this causes the same task to run multiple times almost at the same time. Is there a way to fix this?

s3rius commented 1 month ago

Can you enable debug logging on taskiq worker and post your fastapi template config from pyproject.toml?

rcholic commented 1 month ago

Can you enable debug logging on taskiq worker and post your fastapi template config from pyproject.toml?

This is my logging inside my example task, not sure if it's considered to be taskiq worker debug logging:

2024-07-20 21:13:00.654 | DEBUG     | trace_id=0 | span_id=0 | logging:callHandlers:1706 - Executing task example_app.tasks.schwab_tasks:heavy_task with ID: 7db200850187465ea69651178ae36446
2024-07-20 21:13:00.656 | DEBUG     | trace_id=0 | span_id=0 | example_app.tasks.schwab_tasks:heavy_task:52 - heavy_task: 10 is in worker process!!!
2024-07-20 21:13:00.675 | DEBUG     | trace_id=0 | span_id=0 | logging:callHandlers:1706 - Executing task example_app.tasks.schwab_tasks:heavy_task with ID: dd35acf445654ab6becc1c1e70423546
2024-07-20 21:13:00.677 | DEBUG     | trace_id=0 | span_id=0 | example_app.tasks.schwab_tasks:heavy_task:52 - heavy_task: 10 is in worker process!!!
2024-07-20 21:13:00.689 | DEBUG     | trace_id=0 | span_id=0 | logging:callHandlers:1706 - Executing task example_app.tasks.schwab_tasks:heavy_task with ID: e0b279df4ae445cfba48df53666a5256
2024-07-20 21:13:00.690 | DEBUG     | trace_id=0 | span_id=0 | example_app.tasks.schwab_tasks:heavy_task:52 - heavy_task: 10 is in worker process!!!
2024-07-20 21:13:00.696 | DEBUG     | trace_id=0 | span_id=0 | logging:callHandlers:1706 - Executing task example_app.tasks.schwab_tasks:heavy_task with ID: 6f8cbfd78a27490fa2fc60993ac572cb
2024-07-20 21:13:00.697 | DEBUG     | trace_id=0 | span_id=0 | example_app.tasks.schwab_tasks:heavy_task:52 - heavy_task: 10 is in worker process!!!
2024-07-20 21:13:00.705 | DEBUG     | trace_id=0 | span_id=0 | logging:callHandlers:1706 - Executing task example_app.tasks.schwab_tasks:heavy_task with ID: 98b7e34769c949c0b276a0a95bf83ee9
2024-07-20 21:13:00.706 | DEBUG     | trace_id=0 | span_id=0 | example_app.tasks.schwab_tasks:heavy_task:52 - heavy_task: 10 is in worker process!!!
2024-07-20 21:13:00.712 | DEBUG     | trace_id=0 | span_id=0 | logging:callHandlers:1706 - Executing task example_app.tasks.schwab_tasks:heavy_task with ID: 693fdc9fbe8c43a4977e9d30ece1f671
2024-07-20 21:13:00.713 | DEBUG     | trace_id=0 | span_id=0 | example_app.tasks.schwab_tasks:heavy_task:52 - heavy_task: 10 is in worker process!!!
2024-07-20 21:13:00.719 | DEBUG     | trace_id=0 | span_id=0 | logging:callHandlers:1706 - Executing task example_app.tasks.schwab_tasks:heavy_task with ID: 8f2f7818af43417f9d665a4125ef7e81
2024-07-20 21:13:00.720 | DEBUG     | trace_id=0 | span_id=0 | example_app.tasks.schwab_tasks:heavy_task:52 - heavy_task: 10 is in worker process!!!
2024-07-20 21:13:00.729 | DEBUG     | trace_id=0 | span_id=0 | logging:callHandlers:1706 - Executing task example_app.tasks.schwab_tasks:heavy_task with ID: 0f042d5cdd7947bbadd3f86fe28d4c37

my template config is following:

[fastapi-template.options]
project_name = "example_app"
api_type = "rest"
enable_redis = "True"
enable_rmq = "None"
ci_type = "github"
enable_migrations = "True"
enable_taskiq = "True"
enable_kube = "True"
kube_name = "insights-schwab"
enable_routers = "True"
enable_kafka = "True"
enable_loguru = "True"
traefik_labels = "None"
add_dummy = "None"
orm = "sqlalchemy"
self_hosted_swagger = "None"
prometheus_enabled = "True"
sentry_enabled = "True"
otlp_enabled = "True"
pydanticv1 = "None"
gunicorn = "True"
add_users = "None"
cookie_auth = "None"
jwt_auth = "None"

Thanks for the fast reply!

s3rius commented 1 month ago

Seems like everything is fine. Maybe you have enabled workers reload on code change. So it reloads workers when you update your code. Consider changing something in heavy task and running it again. If output changes, then this assumption is correct and your setup is working as intended.

rcholic commented 1 month ago

@s3rius was wondering if it's necessary to schedule the task inside the _startup method in lifetime.py file under the web folder, like so:

await heavy_task.schedule_by_cron(redis_source, "* * * * *", 10)

The task method already is decorated with schedule:

@broker.task(
    task_name="heavy_task",
    schedule=[
        {
            "cron": "*/1 * * * *",
            "cron_offset": None,
            "time": None,
            "args": [10],
            "kwargs": {},
            "labels": {},
        },
    ],
)
async def heavy_task(a: int) -> None:
s3rius commented 1 month ago

No it's not. If you specify schedules in labels, then add a LabelScheduleSource to your scheduler. Should work just right out of the box.

Because if you're going to schedule this task on startup, don't forget to delete it. LabeledScheduleSource on the other hand will be checking all imported tasks and their lables, so no external storage will be used.

rcholic commented 1 month ago

No it's not. If you specify schedules in labels, then add a LabelScheduleSource to your scheduler. Should work just right out of the box.

Because if you're going to schedule this task on startup, don't forget to delete it. LabeledScheduleSource on the other hand will be checking all imported tasks and their lables, so no external storage will be used.

well, this does not work for me, meaning that the tasks will not get scheduled or executed:

my tkq.py file:


import taskiq_fastapi
from taskiq import InMemoryBroker, TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend

from example_app.settings import settings
from example_app.tasks.task_redis import RedisScheduleSource

result_backend = RedisAsyncResultBackend(  # type: ignore
    redis_url=str(settings.redis_url.with_path("/0")),
)
broker = ListQueueBroker(
    str(settings.redis_url.with_path("/0")),
).with_result_backend(result_backend)

label_source = LabelScheduleSource(broker)
redis_source = RedisScheduleSource(str(settings.redis_url.with_path("/0")))
scheduler = TaskiqScheduler(broker=broker, sources=[label_source])

if settings.environment.lower() == "pytest":
    broker = InMemoryBroker()  # type: ignore

taskiq_fastapi.init(
    broker,
    "example_app.web.application:get_app",
)

In my tasks file named shcwab_tasks.py, I have following task decorated with schedule:


@broker.task(
    task_name="heavy_task",
    schedule=[
        {
            "cron": "*/1 * * * *",
            "cron_offset": None,
            "time": None,
            "args": [10],
            "kwargs": {},
            "labels": {},
        },
    ],
)
async def heavy_task(a: int) -> None:

    output_file = Path(Path(__file__).parent, "output.txt")
    if broker.is_worker_process:
        logger.info("heavy_task: {} is in worker process!!!", a)
    else:
        logger.info("heavy_task: {} not in worker process", a)

    with output_file.open("a") as file:
        rand_int = random.randint(1, 100)
        file.write(f"{a + rand_int}, {rand_int}\n")

The heavy_task never runs. Did you test taskiq scheduler in fastapi template? Thanks @s3rius

s3rius commented 1 month ago

How do you start taskiq scheduler?

rcholic commented 1 month ago

How do you start taskiq scheduler?

I start the scheduler in my docker-compose like this, maybe because the labels array is empty, my task does not run? @s3rius

taskiq-scheduler:
    <<: *main_app
    labels: []
    command:
    - taskiq
    - scheduler
    - -fsd
    - --skip-first-run
    - example_app.tkq:scheduler