eandersson / amqpstorm

Thread-safe Python RabbitMQ Client & Management library
https://www.amqpstorm.io/
MIT License
186 stars 37 forks source link

BUG: consumer_callbacks KeyError #132

Closed mic1on closed 4 weeks ago

mic1on commented 1 month ago

version: 2.10.7

I'm using the library in multithreading and I get the occasional KeyError.

I replicated the problem after troubleshooting and minimal units.

import threading

import amqpstorm

connection = amqpstorm.Connection(hostname="localhost",
                                  port=5672,
                                  username="admin",
                                  password="admin", )

channel = connection.channel()

queues = ["demo1", "demo2", "demo3"]

for queue in queues:
    channel.queue.declare(queue, durable=True)

    for i in range(100):
        channel.basic.publish(f"{i}", queue)

def start_consuming(queue):
    channel.basic.qos(prefetch_count=1)
    channel.basic.consume(
        queue=queue, callback=lambda x: x, no_ack=False
    )
    channel.start_consuming(to_tuple=False)

for queue in queues:
    t = threading.Thread(target=start_consuming, args=(queue,))
    t.start()

I forked the project and tried to fix it. see commit

But I think there may be a more appropriate solution

mic1on commented 1 month ago

File "/Users/miclon/myproject/use-py/use-rabbitmq/example/demo2.py", line 26, in start_consuming channel.start_consuming(to_tuple=False) File "/Users/miclon/myproject/use-py/use-rabbitmq/.venv/lib/python3.11/site-packages/amqpstorm/channel.py", line 358, in start_consuming self.process_data_events( File "/Users/miclon/myproject/use-py/use-rabbitmq/.venv/lib/python3.11/site-packages/amqpstorm/channel.py", line 329, in process_data_events self._consumer_callbacksconsumer_tag


KeyError: 'amq.ctag-MfRBvol6HqTpDreT-TNRPg'
eandersson commented 1 month ago

Interesting. I'll take a look, but a quick fix would be to have dedicated channel for each consumer, instead of sharing the same channel across multiple threads.

eandersson commented 1 month ago

When you have a chance can you take a look at this PR https://github.com/eandersson/amqpstorm/pull/134

jhogg commented 1 month ago

Erik,A concern, but as I read this, if prefetch=1 and acknowledgments are required, the continue if there is no consumer tag will hang that process.  Likewise, if acknowledgements are required it will hang at the point prefetch is exhausted with un-acked messages. JayOn Jul 12, 2024, at 5:20 AM, Erik Olof Gunnar Andersson @.***> wrote: When you have a chance can you take a look at this PR #134

—Reply to this email directly, view it on GitHub, or unsubscribe.You are receiving this because you are subscribed to this thread.Message ID: @.***>

eandersson commented 1 month ago

Erik,A concern, but as I read this, if prefetch=1 and acknowledgments are required, the continue if there is no consumer tag will hang that process.  Likewise, if acknowledgements are required it will hang at the point prefetch is exhausted with un-acked messages. JayOn Jul 12, 2024, at 5:20 AM, Erik Olof Gunnar Andersson @.> wrote: When you have a chance can you take a look at this PR #134 —Reply to this email directly, view it on GitHub, or unsubscribe.You are receiving this because you are subscribed to this thread.Message ID: @.>

The issue was a race condition where the code started consuming, and the other threads started pumping messages before the previous thread was done properly setting up the consumer. Since all of the IO happens asynchronously in the background and we were missing a lock protecting the setup phase.

mic1on commented 1 month ago

@eandersson Thanks! Solved the problem I was having very quickly, hopefully a new version will be released soon!