majek / puka

Puka - the opinionated RabbitMQ client
https://github.com/majek/puka
Other
182 stars 34 forks source link

Unacknowledged messages building up #48

Closed gbenmartin closed 10 years ago

gbenmartin commented 11 years ago

I have a program that runs multiple python processes consuming from the same durable rabbit queue like this:

      _future = rabbit._client.basic_consume(
                queue=self.binding, prefetch_count=1, no_ack=False
                )
        while True:
            try:
                message = rabbit._client.wait(_future, timeout=message_wait_timeout=300)

                if message is not None:
                    rabbit._client.basic_ack(message) 
                    task = json.loads(message['body'])
                    #
                    # Do some work here using the data in task
                    # 

            except Exception as e:
                Log(e)

Where rabbit._client is my rabbit connection to a remote rabbit server, which is filled with 1 million + messages a day. The number of python processes consuming increases and decreases using more/less AWS nodes, depending on the number of messages in the queue, so I have anywhere from 40-400 python processes consuming from the queue concurrently depending on time of day.

My problem is that I'm slowly accumulating unacknowledged messages every day (not sure the exact amount, but it's around 10-100). I can kill every python connection, severing all connections to the rabbit server, but the messages remain unacknowledged and do not jump back into the ready queue.

My theory is that the rabbit server thinks there is still an open channel associated with an old process, but I'm not sure, and I wouldn't even know how to flush them if that was the case. It's my understanding that even though I call basic_ack() immediately after wait(), the ack actually isn't sent until the next wait(), so it's possible that my processes get killed while doing work using the data in the rabbit message (takes between 1-4 seconds) before the next wait(). However, I would think that the unacknowledged messages would become ready again.

Do you have any insights? Hopefully I stated the problem clearly.

majek commented 11 years ago

If you kill a worker that has some unacknowledged messages remaining - rabbitmq will reorder them and they will became available for another worker just when the underlying tcp/ip connection dies.

In other words: if you kill all the workers, by definition, there will be no unacknowledged messages from rabbitmq point of view.

Yes, in puka there is a slight issue with flushing basic_acks. See: https://github.com/majek/puka/issues/3 It should be mostly fixed/mitigated but please do read this bug report.

Frankly? I'd say that a) there is a bug in your code or b) there is an idle tcp/ip connection to rabbitmq somewhere that is either from a dead client or just idle.

Please do investigate and let me know where's the problem.

majek commented 10 years ago

Please reopen this issue if you find any new information.

gbenmartin commented 10 years ago

This was a server side thing that resulted from consumer processes shutting down properly.

My program wasn't shutting down that gracefully, leaving connections open if the message data was being processed when the process was killed (meaning the process didn't make it to the next wait and the ack was never sent). I added better signal handling, and then I looped trough the old open connections with a shell script using rabbitmqctl on my rabbit server to kill the old open connections. No more unacked messages.