taskiq-python / taskiq-aio-pika

AMQP broker for taskiq
17 stars 9 forks source link

multiple queues #16

Open vvanglro opened 1 year ago

vvanglro commented 1 year ago

Like this, I just give a simple example, or there may be other more elegant ways to create multiple queues.

from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker(queue_name="q1")
broker.add_queue(queue_name="q2")

# The first way to specify the queue name
@broker.task(queue="q1")
async def test() -> None:
    print("nothing")

@broker.task(queue="q2")
async def test2() -> None:
    print("nothing")    

# The second way to specify the queue name
@broker.task()
async def test3() -> None:
    print("nothing")

await test3.kicker().with_queue(queue="q1").kiq()
s3rius commented 1 year ago

Hi and thanks for creating an issue and PR. But unfortunately, currently I'm not sure about the PR and the issue. So, here's the way you can achieve desired behavior right now:

  1. You defined different brokers for different queues. And explicitly set queue name for each of them.
  2. You define tasks with default brokers for them. For you example it goes like this:
    
    from taskiq import async_shared_broker

broker1 = AioPikaBroker(queue_name="q1") broker2 = AioPikaBroker(queue_name="q2")

@broker1.task async def test() -> None: print("nothing")

@broker2.task async def test2() -> None: print("nothing")

@async_shared_broker.task async def test3() -> None: print("nothing")

But in this example `test3` doesn't know on which broker it should be called and you must specify it explicitly when calling using kicker.

4. When you want to send a task that was created for a different broker, you use kicker. For example if you have a task, that was created for broker1:
```python
@broker1.task
async def target_task():
    ...

You may call it with another broker like this:

task = await target_task.kicker().with_broker(broker2).kiq()

Adding queue names slow down message processing and I'm wondering how it can be useful in different scenarios. Can you please provide examples where it would be useful?

vvanglro commented 1 year ago

For example, if I have background tasks of different durations and types, I can put tasks of different durations into different queues, and tasks of different categories into different queues.

  1. You defined different brokers for different queues. And explicitly set queue name for each of them.
  2. You define tasks with default brokers for them. For you example it goes like this:
from taskiq import async_shared_broker

broker1 = AioPikaBroker(queue_name="q1")
broker2 = AioPikaBroker(queue_name="q2")

@broker1.task
async def test() -> None:
    print("nothing")

@broker2.task
async def test2() -> None:
    print("nothing")    

@async_shared_broker.task
async def test3() -> None:
    print("nothing")

This can be done by defining different brokers, but the management and use may be confusing. For example, if I need 5 queues, then I have to define 5 brokers, and I need to execute it 5 times when starting through the command line

taskiq worker1:broker
s3rius commented 1 year ago

Different durations for tasks should not be a problem, since we execute our tasks in async way. Even if the function is sync, we use threadPool executor to make it async.

If you're really want this functionality, probably I would suggest you to inherit from the AioPikaBroker and get queue name from task labels. Then you'll be able to define tasks exactly as in your example.

I have to ask @taskiq-python/core-team about this feature, because currently I'm not sure into implementing it.

vvanglro commented 1 year ago

This is an optional feature. If the user add_queue, multiple queues can be used, and there is almost no intrusion.

osttra-o-rotel commented 1 month ago

Hej :)

I'm trying this approach with two brokers... and FastAPI involved, so I have this broker declaration:

brokers

repo_process_broker = AioPikaBroker(env.message_broker_url, queue_name="repo_updates_queue").with_result_backend(
    RedisAsyncResultBackend(env.result_backend_url)
)

admin_broker = AioPikaBroker(env.message_broker_url, queue_name="admin_queue").with_result_backend(
    RedisAsyncResultBackend(env.result_backend_url)
)

brokers = [repo_process_broker, admin_broker]

for broker in brokers:
    taskiq_fastapi.init(broker, "sparrow.distributed_app.fastapi_app:get_app")

These in tasks.py:

@repo_process_broker.task()
async def process_repo_target(
    # some args
    db: Session = TaskiqDepends(get_db),
    deps: Dependencies = TaskiqDepends(get_deps),
):
    ...

@admin_broker.task()
async def fetch_sparrow_repo_config(
    # other args
    db: Session = TaskiqDepends(get_db),
    deps: Dependencies = TaskiqDepends(get_deps),
) -> dict[RepoTargetId, RepoTargetSettings]:

And when I trigger either of them. Example


# in a FastApi route:
await fetch_sparrow_repo_config.kiq(project_key=project_key, repo_name=repo_name)

I can see in rabbitMQ management, that the message is placed on both repo_updates_queue and admin_queue.

And when I start two worker processes (one with each broker).

Both brokers get the message, one of them process the task correctly (the admin), but the other one prints:

WARNING:    2024-07-24 23:11:51,380::taskiq.receiver.receiver: task "app.tasks:fetch_sparrow_repo_config" is not found. Maybe you forgot to import it?

This is how rabbit looks when I start the FastApi app

Screenshot 2024-07-24 at 23 17 43

This one after a kik() the task from FastApi

Screenshot 2024-07-24 at 23 19 33

After I start the admin broker, and executes the task properly.

Screenshot 2024-07-24 at 23 21 16

And when I start the repo_process one, which is when I get the warning fot the task not being defined, the message ends up un acked (I guess because I have the default ack on save, and that doesn't happen here).

Screenshot 2024-07-24 at 23 21 38

So every time I restart the repo_process_broker worker, It asks for those unacked ones and the warnings accumulate with each new message.

Side note, I really like the project! Has very nice usage!

Thanks for any info... let me know if you need more details to reproduce...

Saludos!

osttra-o-rotel commented 1 month ago

After playing a bit more, and looking at taskiq_aio_pika/broker.py, I saw that declaring the brokers with an exchange name, did the trick:

repo_process_broker = AioPikaBroker(
    env.message_broker_url,
    queue_name="repo_updates_queue",
    exchange_name="repo_updates_exchange",
).with_result_backend(
    RedisAsyncResultBackend(env.result_backend_url)
)

:)