taskiq-python / taskiq

Distributed task queue with full async support
MIT License
689 stars 44 forks source link

How to check all the tasks are completed #331

Open emelpolaris opened 3 weeks ago

emelpolaris commented 3 weeks ago

I'm using taskiq_redis to distribute the tasks to multiple gpus. I wanna know if all the tasks are completed or not. Please let me know how I can this info. Thanks.

s3rius commented 3 weeks ago

The easiest way would be to start using pipelines.

https://github.com/taskiq-python/taskiq-pipelines

import asyncio
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
from taskiq_pipelines import Pipeline, PipelineMiddleware

broker = (
    ListQueueBroker("redis://localhost:6379/0")
    # Here we add the PipelineMiddleware to the broker,
    # so we can use the pipelines in the broker
    .with_middlewares(PipelineMiddleware())
    # Here's the result backend for the broker,
    # It's required for pipelines to work, because
    # intermediate results should be stored somewhere
    .with_result_backend(
        RedisAsyncResultBackend(
            "redis://localhost:6379/1",
            keep_results=False,
        )
    )
)

@broker.task
async def my_task(a: int) -> int:
    await asyncio.sleep(1)
    return a * 2

@broker.task
async def generate_tasks(num: int) -> list[int]:
    return list(range(num))

# Here's the pipeline to run your set of tasks
# It will generate some required values for tasks and then
# map each element in parallel.
# 
# Also, we define the check_interval for my_task to be 1 second
# to minimize amount of requests to the result backend.
# You can granularly define check_interval for each task
# in the pipeline.
pipe = Pipeline(broker, generate_tasks).map(my_task, check_interval=1)

async def main():
    await broker.startup()
    # Pipeline itself is a task and therefore the interface is the same.
    # You can wait for the result as if it was a regular task.
    task = await pipe.kiq(10)
    res = await task.wait_result(check_interval=1)
    print(res.return_value)
    await broker.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

Or, you can use utility function to gather all tasks as in asyncio.gather, by running:

import asyncio
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
from taskiq import gather as taskiq_gather

broker = ListQueueBroker("redis://localhost:6379/0").with_result_backend(
    RedisAsyncResultBackend(
        "redis://localhost:6379/1",
        keep_results=False,
    )
)

@broker.task
async def my_task(a: int) -> int:
    await asyncio.sleep(1)
    return a * 2

async def main():
    await broker.startup()
    tasks = []
    for i in range(10):
        tasks.append(await my_task.kiq(i))
    results = await taskiq_gather(*tasks, periodicity=1)
    for result in results:
        print(result.return_value)
    await broker.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

To minimize amount of requests to the result_backend don't forget to specify delays between checks. Hope it helped you.