taskiq-python / taskiq

Distributed task queue with full async support
MIT License
694 stars 44 forks source link

Question about synchronous tasks #242

Closed noctuid closed 7 months ago

noctuid commented 7 months ago

I'm currently investigating existing task queues for our project and had a couple of questions about how taskiq works.

Also, how do you pronounce taskiq?

s3rius commented 7 months ago

Hi!

  1. Yes. Taskiq leverages asyncio at it's maximum and tries to run as many tasks as possible (We have a parameter for maximum number of concurrent tasks, though to not overload the loop). You can see how we execute tasks here: https://github.com/taskiq-python/taskiq/blob/9aae39758686fd6fbdeb3ea90559b64503d3ae76/taskiq/receiver/receiver.py#L339.

  2. Taksiq takes care of that. You can find how it does it here: https://github.com/taskiq-python/taskiq/blob/9aae39758686fd6fbdeb3ea90559b64503d3ae76/taskiq/receiver/receiver.py#L229 If the function is sync, taskiq runs it in executor. The max-threadpool-threads parameter configures how many threads are going to be assigned to the thread pool executor.

  3. I pronounce it as taskick with the stress on a. But you can pronounce it as you wish.

noctuid commented 7 months ago

Awesome, thanks! One more question: We will have some short-running tasks (e.g. 10 seconds) and some long-running tasks that are CPU bound (e.g. an hour). This might be variable for the same task, but we could predict beforehand if it would take a very long time, so we could always split the into two tasks. I'd like to go with a library that can support some way to handle this without the long-running tasks completely preventing the short-running tasks from being performed.

Ideally there would be some way to have some workers never accept longer tasks. I was thinking maybe it would be nice if you could denote that a task shouldn't be run on more than x number of worker child process at the same time, though I don't know if that's feasible. What I think maybe could be done currently would be to run taskiq worker multiple times. Can you run taskiq worker multiple times passing different brokers so each would handle different tasks and still use the same e.g. Redis url/instance for the broker/backend?

Let me know if you have any better ideas.

s3rius commented 7 months ago

Since taskiq is very flexible, you can do lots of different setups. For this specific case, I would suggest to create two brokers and use one for short tasks and another for long tasks.

You can swap brokers which sends the task, by using kicker instead of calling kiq.

Imagine following setup:


broker = Broker(...)
long_running_broker  = Broker(...)

@broker.task
async def my_task():
    pass

Then when you expect this task to run for very long time, you can call it this way, to not interfere into small tasks execution:

if is_long_running:
    await my_task.kicker().with_broker(long_running_broker).kiq()

This way this task is going to run within long_running_broker, you will have multiple queues and worker groups, but I guess that's fine, since your idea was to keep long-running tasks and short-running tasks separate.

To run multiple worker processes, just run them with taskiq.

taskiq worker project.tkq:broker
taskiq worker project.tkq:long_running_broker

Of course, replace project.tkq with real path to the broker definition module.

Amitg1 commented 1 day ago

Hi @s3rius! I have a similar question, does Taskiq support multiple queues? I have at least 5 different task types. i am looking for something like https://stackoverflow.com/questions/19853378/how-to-keep-multiple-independent-celery-queues.

Thanks!