Closed vvanglro closed 1 year ago
I didn't see any pip.watch
call in redis, nor lua scripts. So, with pubsub broker this might happen when using multiples workers.
BTW, this is how arq handle this problem.
@vvanglro, thanks for raising this issue. What do you mean by saying competition? It works as redis designed it to work. Every worker process will receive the same task and we know that. If you want workers to process every message once, please use ListQueueBroker
instead.
Maybe I didn't understand the issue correctly. If I haven't answer your question, please clarify what do you want us to fix?
Yes, the competition here means that for example, if I have a worker on server A and I specify 8 worker processes when I start it, it will be consumed 8 times. If a worker has 8 worker processes on server B, then this message will be It will be consumed 16 times, and the number of consumption depends entirely on the number of processes specified by the user, so in this case, I think there are only two subscribers, A and B?
In the case you presented there will be 16 consumers. Because each process starts it's own listen loop.
It's stated in README's section PubSubBroker and ListQueueBroker configuration
. This behavior is known and we wanted to make it that way. We found this functionality useful, but also, as was said previously if that's not a behaviour that you'd expect from taskiq, you can easily switch to another broker from taskiq-redis
package.
The ListQueueBroker uses redis lists to store tasks. Everytime you want to receive a task, it grabs it from the list and since it uses BRPOP operation, you can guarantee that every task will be processed only once. If even that implementation doesn't fit your needs, please provide us with information about which broker you want to be implemented.
Yes, you are right. What I understood at the beginning was that each subscriber has different processing logic, but after reading the project code, I found that the same task will be consumed multiple times. I may not have understood this scenario. ,Feel sorry
Can you add some examples or documentation to taskiq? For example, the use of labels in the decorator, if I didn't read the code today, I wouldn't know that it can use labels={ "retry_on_error": True, }, so I hope there is an example of what parameters the decorator can have and what it does, thank you, this is a great project, but it looks a little young, I like it, I want to use it in production Environment.
@vvanglro, you have nothing to sorry about. We appreciate your interest in our project. Every issue help us to evolve in something greater.
https://github.com/taskiq-python/taskiq-redis/blob/main/taskiq_redis/redis_broker.py#L66 https://github.com/taskiq-python/taskiq/blob/master/taskiq/cli/worker/process_manager.py#L163
When I looked at the code, I found that there may be competition in multi-process operation, resulting in repeated consumption. Is there such a possibility? 🤔