taskiq-python / taskiq-redis

Broker and result backend for taskiq
MIT License
36 stars 17 forks source link

The same task executed as many as worker_number times #11

Closed wwarne closed 1 year ago

wwarne commented 1 year ago

It seems like broker just runs a task in every available worker process.

Here is simple example based on docs.

broker.py

import asyncio
import os

from taskiq_redis import RedisAsyncResultBackend, RedisBroker

redis_async_result = RedisAsyncResultBackend(redis_url="redis://localhost:6379/")

broker = RedisBroker(
    url="redis://localhost:6379/",
    result_backend=redis_async_result,
)

@broker.task
async def add_one(value: int) -> int:
    current_pid = os.getpid()
    print(f'Executing in the process with pid: {current_pid}')
    return value + 1

async def main() -> None:
    await broker.startup()
    # Send the task to the broker.
    task = await add_one.kiq(1)
    # Wait for the result.
    result = await task.wait_result(timeout=2)
    print(f"Task execution took: {result.execution_time} seconds.")
    if not result.is_err:
        print(f"Returned value: {result.return_value}")
    else:
        print("Error found while executing task.")
    await broker.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

taskiq worker broker:broker --workers=5 (it's 2 by default)

$ python broker.py

Task execution took: 3.504753112792969e-05 seconds.
Returned value: 2

Log from workers:

[2023-02-23 13:27:46,104][INFO   ][MainProcess] Starting 5 worker processes.
Executing in the process with pid: 57724.
Executing in the process with pid: 57721.
Executing in the process with pid: 57720.
Executing in the process with pid: 57722.
Executing in the process with pid: 57723.

So it has been executed 5 times.

Probably it's not desired especially if task is using a lot of recourses or sending some emails.

Am I doing something wrong?

s3rius commented 1 year ago

Hi and thanks for raising this issue.

This is a normal behavior, because that's how redis PUBSUB works.

You can read more here: https://redis.io/docs/manual/pubsub/

If you want to execute one tasks in a time, consider using rabbitmq broken instead.

wwarne commented 1 year ago

Thank you!

booqoffsky commented 1 year ago

What about using Redis Streams?

UPD: BRPOP may be a better solution. The disadvantage of using Stream is that it stores processed messages, in this application this functionality is not needed.

s3rius commented 1 year ago

@booqoffsky, hi. It might be a good solution, indeed. I guess we need to transfer this issue to taskiq-redis and add RedisStreamBroker.

s3rius commented 1 year ago

Hi, @wwarne! Please check out our new release 0.2.0. Thanks to @booqoffsky now we have a new broker that runs as you expect, without broadcasting messages. Sorry so it took so long, but we made huge improvements in taskiq internals.

If it works for you, please mark issue as resolved.

wwarne commented 1 year ago

I checked it and everything works great! Thank you a lot guys, you are amazing!