veegee / amqpy

Pure-Python 2 & 3 AMQP client library
http://amqpy.readthedocs.org/
Other
32 stars 5 forks source link

missed messages on a channel.. #27

Closed gst closed 9 years ago

gst commented 9 years ago

I'm not sure how this is supposed to behave but I'm mostly sure something is wrong somewhere : Basically, in a (quite simple) single threaded script, containing only the following :

import amqpy
from amqpy.consumer import AbstractConsumer
from amqpy.exceptions import Timeout
import time

exchange = 'test.x'
queue = 'test.q'

exchange = 'file.topic.x'
queue = 'log.file_upload.q'

routing_key = 'log.eventlog'

count = 0

class Consumer(AbstractConsumer):

    def run(self, msg):
        print(msg)
        global count
        count += 1
        msg.ack()

def main():

    conn = amqpy.Connection()
    channel = conn.channel()

    channel.basic_recover(requeue=True)

    channel.exchange_declare(exchange, 'topic',
                             auto_delete=False)

    channel.queue_declare(queue, auto_delete=False)
    channel.queue_bind(queue, exchange, routing_key)

    consumer = Consumer(channel, queue)
    consumer.declare()

    while True:

        # comment these 2 lines and no more problem
        ch = conn.channel()
        ch.close()

        try:
            conn.drain_events(0)
        except Timeout:
            time.sleep(1)

if __name__ == '__main__':
    try:
        main()
    finally:
        print("Tot count: %s" % count)

launch this script in a terminal,

launch another script to publish say 100 messages in a row on the given exchange/queue..

see the consumer receives some of the published messages BUT not all..

To check it :

once the publish script has finished and you wait some time to give the consumer time to process the 100 messages.. then stop/interrupt the consumer script, it will report "Tot count = .." with something != 100. second verification: open a browser to your rabbitmq webadmin page, check the queue "unacked' number .. it's != 0 .. (at least in my situation)

that is: it's all as if our consumer should have received all messages (as said by rabbitmq itself), but some are not acked thus. Problem is that on our consumer side we don't get all of them as if some were lost in between the server and our client..

if we comment the channel creation just after the while True then the consumer receives well all the messages..

any idea ?

veegee commented 9 years ago

This is interesting, good catch. It looks like the conn.channel()/ch.close() operations are waiting for a response message from the server, but receiving messages not meant for them and thus enqueuing them internally. The AbstractChannel class has the logic for this. I believe if you check the queued messages, they will be in there. I'll take a look at this as soon as I can.

gst commented 9 years ago

Yes that's exactly what I guess could be happening. No time to check code yet on my side..

gst commented 9 years ago

Thx @veegee :-)

veegee commented 9 years ago

There were a couple of small bugs caused by this fix but I fixed those as well. Everything is fine in the latest version.