Open somnam opened 2 years ago
Does anyone have an idea how these duplicated messages get created?
I tried debugging the dramatiq worker.py
and redis.py
modules and can't seem to find the reason why (as I presume) handle_delayed_messages()
method gets called with same message multiple times.
My only hunch here is that maybe it's an issue with the PriorityQueue
when using gevent
on python side, or the do_maintenance
code in the lua script for redis but I can't seem to create an reproducible example except the issue happening daily on production.
The issue can be (somewhat) mitigated by using a mutex in the dramatiq actor for the unique account_uuid
value I pass to the actor method, but still it's a big issue, as one delayed task gets anyway enqueued multiple times but with some delay between runs due to raised mutex exception.
@somnam
One potential cause for the duplication is that it first copies the message over to the default queue https://github.com/Bogdanp/dramatiq/blob/75579439172ea126efb4f83c753b0b513afb03ad/dramatiq/worker.py#L309 and afterwards starts to manage (ack) the delayed message. https://github.com/Bogdanp/dramatiq/blob/75579439172ea126efb4f83c753b0b513afb03ad/dramatiq/worker.py#L352 So the situation where a message is in both queues is expected.
For rabbitmq I have the idea that it might not get to the callback which eventually does the ack. (related issue) However I lack familiarity with the redis implementation and the exact scheduling based on gevent to really pinpoint what might be going wrong here.
Hope it helps to debug this situation.
I am encountering this issue
Also getting this duplicate task isssue
OS
CentOS Linux release 7.5.1804 (Core)
Python version
3.7.3
Dramatiq version
v1.11.0
Gevent version:
gevent==21.1.2 greenlet==1.0.0
Broker:
Redis server v=3.2.12
What did you do?
I run the latest dramatiq version using
dramatiq-gevent
command.On the host machine I'm running two separate dramatiq instances that consume messages from separate queues:
I've enqueued multiple messages for an actor on the
default
queue and each of these messages has a delay set to be run in the future. This is done usingsend_with_options
method:What did you expect would happen?
Each delayed message is consumed and processed only once by a single WorkerThread.
What happened?
Some of the messages get duplicated when being enqueued by Broker on the 'default' queue.
This results in a single message being processed concurrently or in short time intervals by multiple WorkerThreads. In the case of mentioned actor this leads to duplicated entries in database.
Example delayed message enqueued in
default.DQ
queue:The messages get pushed to
dramatiq:default.msgs
queue after time set ineta
field.When inspecting
dramatiq:default.msgs
I can see that a single delayed message has been multiplicated:Each message has the same
message_id
but differentredis_message_ids
.After inspecting workers I can see that they have consumed these duplicates and will process them:
The amount of messages with same ID being set in
dramatiq:default.msgs
looks random - sometimes (correctly) only one, sometimes two or more.Exaple debug logs when one message gets picked up by two Workers:
Same example for different message:
Reproducible example:
Unfortunately I wasn't able to reproduce the issue in the development environment, but it happens on a regular basis on production env.
Funding