noxdafox / rabbitmq-message-deduplication

RabbitMQ Plugin for filtering message duplicates
Mozilla Public License 2.0
271 stars 33 forks source link

Celery lost connection to broker #107

Open AlessandroSalvetti opened 3 months ago

AlessandroSalvetti commented 3 months ago

Hi, I'm using rabbitmq 3.13.0 Erlang 26.2.2 rabbitmq-message-deduplication 0.6.2

celery 5.3.6 django-celery-beat 2.6.0

When the the argument x-message-deduplication is set to true for the celery queue, then all the messages sent to the queue causes celery to lost the connection:

consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 340, in start
    blueprint.start(self)
  File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 742, in start
    c.loop(*c.loop_args())
  File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/celery/worker/loops.py", line 97, in asynloop
    next(loop)
  File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/kombu/asynchronous/hub.py", line 373, in create_loop
    cb(*cbargs)
  File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/kombu/transport/base.py", line 248, in on_readable
    reader(loop)
  File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/kombu/transport/base.py", line 230, in _read
    drain_events(timeout=0)
  File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/amqp/connection.py", line 526, in drain_events
    while not self.blocking_read(timeout):
  File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/amqp/connection.py", line 532, in blocking_read
    return self.on_inbound_frame(frame)
  File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/amqp/method_framing.py", line 53, in on_frame
    callback(channel, method_sig, buf, None)
  File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/amqp/connection.py", line 538, in on_inbound_method
    return self.channels[channel_id].dispatch_method(
  File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/amqp/abstract_channel.py", line 156, in dispatch_method
    listener(*args)
  File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/amqp/channel.py", line 1467, in _on_basic_cancel
    raise ConsumerCancelled(consumer_tag, spec.Basic.Cancel)
amqp.exceptions.ConsumerCancelled: Basic.cancel: (0) None8

What it could be?

noxdafox commented 3 months ago

EDIT: Just missed the beginning, my apologies.

I need to see the error logs on the broker side, do you have them available?