taskiq-python / taskiq

Distributed task queue with full async support
MIT License
808 stars 50 forks source link

Ability to `kiq` a task by path #259

Open pahrohfit opened 9 months ago

pahrohfit commented 9 months ago

Is it possible to pass a task to the queue by its path, rather than via import?

Instead of:

file: my/package/module/tasks.py

@broker.task
async def add_one(value: int) -> int:
    return value + 1

file: my/package/foo/module/api.py

from my.package.module.tasks import add_one
task = await add_one.kiq(1)

Would be like:

file: my/package/module/tasks.py

@broker.task
async def add_one(value: int) -> int:
    return value +1

file: my/package/foo/module/api.py

await broker.kiq('my.package.module.tasks.add_one', *args, **kwargs)
s3rius commented 9 months ago

Yes, it's possible.

from taskiq_redis import ListQueueBroker
from taskiq.kicker import AsyncKicker

broker = ListQueueBroker("redis://localhost:6379")

@broker.task(task_name="task_name")
def task():
    print("Hello World!")

async def main():
    await broker.startup()
    await AsyncKicker("task_name", broker, {}).kiq()
    await broker.shutdown()

if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

But please note, that to run it it should been imported by broker on startup. To import it automatically, use filesystem-discover, as described here: https://taskiq-python.github.io/guide/cli.html#auto-importing

pahrohfit commented 9 months ago

I dug through the code a little, and came up with this (feels a little cleaner and easier to read):

    await taskiq_broker().find_task('my.package.module:my_task').kiq()

Where taskiq_broker is my initialized broker, and my tasks are decorated with async_shared_broker.task in a Sanic application.

Problem I was working around was some lazy import looping, but also with calling tasks that are not necessary to be included in the Sanic code base (fully external job runners). So the imports for the Worker weren't the problem.

Thank you very much for your quick response!