eandersson / amqpstorm

Thread-safe Python RabbitMQ Client & Management library
https://www.amqpstorm.io/
MIT License
186 stars 36 forks source link

qos is not working as intended #117

Closed sulsj closed 2 years ago

sulsj commented 2 years ago

Hi, I am creating multiple consumers that are listening to the same queue. The initialization part is

class TaskRunner(JtmAmqpstormBase):
    """
    Process user task

    """

    def start(self, req_q):
        """Start the OnWorkerResult.
        :return:
        """
        if not self.connection:
            self.create_connection()
        while True:
            try:
                channel = self.connection.channel()
                channel.basic.qos(prefetch_count=1)
                channel.exchange.declare(
                    exchange=self.jtm_inner_main_exch,
                    exchange_type="direct",
                    durable=True,
                    auto_delete=False,
                )
                channel.queue.declare(
                    queue=req_q, durable=True, exclusive=False, auto_delete=True
                )
                channel.queue.bind(
                    exchange=self.jtm_inner_main_exch, queue=req_q, routing_key=req_q
                )
                channel.basic.consume(self.process_task, req_q, no_ack=False)
                #channel.basic.qos(prefetch_count=1)
                channel.start_consuming()
                if not channel.consumer_tags:
                    channel.close()
            except amqpstorm.AMQPError as why:
                logger.exception(why)
                self.create_connection()
            except KeyboardInterrupt:
                self.connection.close()
                break
            except Exception as e:
                logger.exception(e)
                if self.connection:
                    self.connection.close()
                raise

The issue I've been having is all the messages sent to the queue (req_q) are taken by a single consumer. The other consumers who listen to the same queue never get any message.

Here comes the screen shot of my RabbitMQ dashboard for the channels. Even though prefetch_count=1 is set, the RMQ shows all the channel have prefetch_count=0.

image

I tried

channel.basic.qos(prefetch_count=1, prefetch_size=1)

but found only prefetch_size=0 is allowed.

Also tried

channel.basic.qos(prefetch_count=1, global_=True)

but no effect.

Any idea?

Thanks, Seung

sulsj commented 2 years ago

I've found another user's process is updating the channel property back. It's resolved now. Thanks!

eandersson commented 2 years ago

Glad it got sorted! Feel free to reach out again if you have any questions.