taskiq-python / taskiq

Distributed task queue with full async support
MIT License
694 stars 44 forks source link

kiq jobs with unique names, so they won't be created twice #271

Open gegnew opened 6 months ago

gegnew commented 6 months ago

I can't seem to find anything about this in the docs. I'd like to assign a job (not a task) a unique name, so that if I were to try to kiq a job with the same parameters twice, the broker would ignore the duplicate job. Something like:

async def foo(param):
    await asyncio.sleep(1)

if __name__ == __main__:
    params = ['a', 'a', 'b', 'c']  # the second "a" should be skipped
    for p in params:
        await foo.kiq(p, job_id=f"job_{p}")

>>> "a"
>>> "b"
>>> "c"

Is this possible?

gegnew commented 6 months ago

Clearly the AsyncBroker can take an alternative task_id_generator, but it doesn't seem possible to pass a custom task_id anywhere, nor is there uniqueness handling, I think?

s3rius commented 6 months ago

Hi, @gegnew and thanks for your interest in the project. You're right. We don't handle uniqueness of ids. Ids meant to be unique only during task execution to not save calculation result twice.

But here's what you can do. You can create a middleware that tracks ids so they are unique among tasks with specific name.

from taskiq.abc.broker import AsyncBroker
from taskiq.message import TaskiqMessage
from taskiq_redis import ListQueueBroker
from taskiq import TaskiqMiddleware
from redis.asyncio import Redis

class UniqueIdMiddleware(TaskiqMiddleware):
    def __init__(self, redis_url: str) -> None:
        self.pool = Redis.from_url(redis_url)

    def set_broker(self, broker: AsyncBroker) -> None:
        return super().set_broker(broker)

    async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
        if await self.pool.get(f"unique:{message.task_name}:{message.task_id}"):
            raise ValueError("Task is already running")

        await self.pool.set(f"unique:{message.task_name}:{message.task_id}", 1, ex=20)
        return message

broker = ListQueueBroker("redis://localhost:6379").with_middlewares(

async def my_task():
    print("I'm a task")

async def main():
    await my_task.kicker().with_task_id("2").kiq()
    await my_task.kicker().with_task_id("2").kiq()

if __name__ == "__main__":
    import asyncio


I won't implement this functionality on broker's behalf, because it doesn't meant to be like this. If you need it, you can easily implement it on your own.