Bogdanp / dramatiq

A fast and reliable background task processing library for Python 3.
https://dramatiq.io
GNU Lesser General Public License v3.0
4.37k stars 313 forks source link

Exception when worker gets Pandas DataFrame as argument #631

Open brakhane opened 5 months ago

brakhane commented 5 months ago

What OS are you using?

Windows 10

What version of Dramatiq are you using?

Version: 1.17.0

What did you do?

I ran into an issue where a dramatiq worker takes a DataFrame to do some processing, and when more than one message is in the queue, the broker raises an exception because MessageProxy's __eq__ is comparing two messages containing the dataframe argument using == which Pandas doesn't like.

Reproducable example:

import pandas as pd
import dramatiq

dramatiq.set_encoder(dramatiq.PickleEncoder()) 

@dramatiq.actor
def buggy(df: pd.DataFrame):
    pass

if __name__ == "__main__":
    for _ in range(10):  # the bug does not appear if only one message is sent
        buggy.send(pd.DataFrame([[1,2,3],[4,5,6]], [1,2]))

Starting workers with dramatiq -t 1 -p 2 bug (to avoid out of memory issues) and the main program with python bug.py results in the following exception

[2024-06-22 23:50:51,477] [PID 132100] [Thread-2] [dramatiq.worker.ConsumerThread(default)] [CRITICAL] Consumer encountered an unexpected error.
Traceback (most recent call last):
  File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\dramatiq\worker.py", line 267, in run
    self.handle_message(message)
  File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\dramatiq\worker.py", line 328, in handle_message
    self.work_queue.put((actor.priority, message))
  File "C:\Users\Dennis\AppData\Local\Programs\Python\Python311\Lib\queue.py", line 150, in put
    self._put(item)
  File "C:\Users\Dennis\AppData\Local\Programs\Python\Python311\Lib\queue.py", line 236, in _put
    heappush(self.queue, item)
  File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\dramatiq\broker.py", line 394, in __eq__
    return self._message == other._message
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<string>", line 4, in __eq__
  File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\pandas\core\generic.py", line 1527, in __nonzero__
    raise ValueError(
ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
[2024-06-22 23:50:51,480] [PID 132100] [Thread-2] [dramatiq.worker.ConsumerThread(default)] [INFO] Restarting consumer in 3.00 seconds.

Immediately followed by

Exception in thread Thread-4:
Traceback (most recent call last):
  File "C:\Users\Dennis\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 1038, in _bootstrap_inner
    self.run()
  File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\dramatiq\worker.py", line 462, in run
    self.process_message(message)
  File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\dramatiq\worker.py", line 525, in process_message
    self.work_queue.task_done()
  File "C:\Users\Dennis\AppData\Local\Programs\Python\Python311\Lib\queue.py", line 75, in task_done
    raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times

What did you expect would happen?

I expected dramatiq to not raise an exception

wsturgiss commented 5 months ago

Not really solving your issue, but something we did that might work as a workaround is that we don't pass Dataframes directly to Dramatiq (partially because my use case has massive Dataframes), but persist what we need to the DB, then only pass an id for a row and fetch it within the worker.

jenstroeger commented 2 months ago

You may consider changing the message encoder from the default JSON to something more adequate (docs).

brakhane commented 2 months ago

In this example, I’m using PickleEncoder. IMHO, the problem lies within the broker that doesn’t handle exceptions when comparing two messages for equality.

Even when doing something stupid like in this example, it should either handle it gracefully, or fail right away. As it currently is implemented, it seems to work at first, until more than one message is sent.