Bogdanp / dramatiq

A fast and reliable background task processing library for Python 3.
https://dramatiq.io
GNU Lesser General Public License v3.0
4.34k stars 311 forks source link

Message duplication under high load of a worker with RabbitMQ broker #461

Closed Ecno92 closed 2 years ago

Ecno92 commented 2 years ago

Issues

When launching CPU intensive tasks from the delayed queue with a high amount of threads dramatiq raises pika.exceptions.ConnectionWrongStateError: BlockingConnection.add_callback_threadsafe() called on closed or closing connection. errors and duplicates tasks across the queues.

What OS are you using?

Ubuntu 20.04.3 LTS

What version of Dramatiq are you using?

1.12.1

What did you do?

Created a file called example.py and loaded the messages in the queue: python3 example.py

import dramatiq
import time

@dramatiq.actor
def slow_thing():
    start_t = time.monotonic()
    while time.monotonic() - start_t < 100:
        2 * 2

def example():
    delay = 15000
    for i in range(100):
        delay += 1000
        slow_thing.send_with_options(delay=delay)

if __name__ == '__main__':
    example()

Then I launched the worker with a high amount of threads. dramatiq example --threads=40 --processes=1

The notebook is based around a Intel(R) Core(TM) i7-10610U. The issue does depend on the right load. So you may need to play a little bit with the thread count to get the same results.

What did you expect would happen?

I expected to see the messages being moved from the delayed queue to the normal queue:

image

During the process I expect both totals to be 100 tasks. At the end I expect something like this:

image

(1 item is already processed after 100 seconds)

This is an example made with dramatiq example --threads=1 --processes=1.

What happened?

Initially it all started OK:

image

However once all threads start to take more work the totals become out of balance:

image

Some time later I see this message in my terminal:

[2021-12-28 18:17:55,826] [PID 175469] [Thread-3] [dramatiq.brokers.rabbitmq._RabbitmqConsumer] [ERROR] Failed to wait for all callbacks to complete.  This can happen when the RabbitMQ server is suddenly restarted.
Traceback (most recent call last):
  File "/home/x/.cache/pypoetry/virtualenvs/dramatiq-issue-example-Abb_BWZr-py3.8/lib/python3.8/site-packages/dramatiq/brokers/rabbitmq.py", line 542, in close
    self.connection.add_callback_threadsafe(all_callbacks_handled.set)
  File "/home/x/.cache/pypoetry/virtualenvs/dramatiq-issue-example-Abb_BWZr-py3.8/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 742, in add_callback_threadsafe
    raise exceptions.ConnectionWrongStateError(
pika.exceptions.ConnectionWrongStateError: BlockingConnection.add_callback_threadsafe() called on closed or closing connection.

When I now kill the dramatiq worker I'm left in this state:

image

When I launch/stop/lauch the worker a few times for the problem becomes even bigger. We now have more than twice the amount of messages that we originally created.

image

Related resources

Issue created after the discussion of the default thread count for dramatiq_django: https://github.com/Bogdanp/django_dramatiq/issues/115

Bogdanp commented 2 years ago

You are starving the consumer threads because you have a high thread count and a task that spends all of its time on the CPU. That means that RMQ can't communicate with those threads and eventually closes those connections, which automatically unacks the messages in flight. Once the consumers reconnect, you end up with more and more messages.

This behavior isn't limited to high thread counts. You can get the pathological behavior even with a single pinned CPU thread if you're unlucky (depending on how your kernel schedules threads) and the pinned thread is picked repeatedly on switch (see sys.{get,set}switchinterval) for longer than the heartbeat interval. The solution here is to either increase the heartbeat timeout or to ensure no single thread can pin the CPU for longer than the heartbeat timeout (either by refactoring your code to introduce explicit pauses or by lowering the switch interval).

Ecno92 commented 2 years ago

Thanks for the in depth reply @Bogdanp. We are now one week further and I gave it some time to wrap my head around the issue and specifically the second part of your reply.

From a task library I expect that it will manage the tasks reliably, but there seems to be always a chance that we get unlucky. Even when running a single worker thread.

I consider this as an issue which should remain open as long as we can not guarantee the reliability of the task management. What's your take on this given that you have closed the issue?

Bogdanp commented 2 years ago

Dramatiq strives for at-least-once delivery, so I think this behavior is in line with the kinds of guarantees that the library tries to offer. Unfortunately, we can't do better here since the runtime doesn't give us much leeway and since I wouldn't want to give up at-least-once semantics in favor of at-most-once (which would solve this particular issue but cause others).

I think the solutions are the ones I presented in my previous comment, plus writing idempotent tasks (as the documentation recommends) so that a task being re-run isn't a problem. Perhaps this could also be called out more/detailed in the documentation. If you're willing to write that call-out, I'd be happy to merge it. I don't have much time to spend on dramatiq these days.

Ecno92 commented 2 years ago

Thanks for the reply. I'll check what the conclusion will be for our project and what would be the best followup. :+1: