sstone / amqp-client

[THIS PROJECT HAS BEEN ARCHIVED AND IS NO LONGER MAINTAINED] Simple fault-tolerant AMQP client written in Scala and based on Akka and the RabbitMQ java client
MIT License
161 stars 88 forks source link

Consumers are not redefined after channel reconnect, resulting in ack errors: reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag #30

Closed tomrogers3 closed 10 years ago

tomrogers3 commented 10 years ago

We experienced an error scenario where a channel died while our application was connected, amqp-client created another channel but didn't redefine the consumers, so as the consumers continued to attempt to ack messages it seems amqp-client was acking them via the old broken consumer tags, so this error message occurred and messages remained in RabbitMQ in an unacked state. Unfortunately I don't have the logs available anymore to show.

Seemingly relevant discussion on the problem is here:

http://rabbitmq.1065348.n5.nabble.com/reply-code-406-reply-text-PRECONDITION-FAILED-unknown-delivery-tag-tp21310p21312.html

Paraphrasing from this webpage, perhaps the following could be the issue?

"- ack'ing on a channel other than the one the messages was received on (*)

... (*) this is a common occurrence in applications with naive error recovery logic that re-establishes connections&channels transparently behind the scenes. "

I'm no RabbitMQ expert, so am interested in your thoughts. Also a potential solution might be to use channel.basicCancel(consumerTag)?

Some example code that might be of use to implement this is here (credit to momania/akka-amqp for this):

https://github.com/momania/akka-amqp/blob/master/src/main/scala/akka/amqp/DurableConsumer.scala#L90

sstone commented 10 years ago

Hi Tom, Which version are you using (and which version of akka ?) I can see how you would get these errors if a channel is lost and replaced, but I don't understand why the consumer has not been recreated, and the messages resent by the broker. I also don't understand why the messages remain stuck in an "unacked" state in the broker. I remember this discussion thread and was pretty sure that it was caused by threading errors and I was immune to it... I will start working on this tomorrow and will keep you informed. Thanks.

sstone commented 10 years ago

Ok, I see what's wrong, I have a fix that is about 80% ready, I hope I'll be able to push it tomorrow. I've worked on the latest scala2.10 codebase and against akka 2.2.3, please tell me if you need the fix for akka 2.1 as well. Thanks,

tomrogers3 commented 10 years ago

Hi sstone, that's great, thanks. We're using akka 2.2.3 and scala 2.10.3. If you need any other info just let me know.

sstone commented 10 years ago

I believe that this issue is fixed in branch wip-welcome, for which I've published a snapshot to the sonatype snapshot repo (version 1.3-WIP-BECOME-SNAPSHOT). I've also added a specific test (Bug30Spec) for this problem. The branch is called wip-become because I've replaced the FSM by calls to context.become. It is something we have done at work for all FSM with few states over the last 6 months, and I believe it makes the code simpler and its intent clearer.

The issue, as you rightly guessed, was that new channels were transparently handling Acks for the dead ones they had replaced. Since delivery tags are incremental and start at 0 when a channel is created, this was * really * bad.

The fix is to have each ChannelOwner forward requests to a "forwarder" child actor (yes it's a bad name, it is really not a forwarder...) which holds an actual channel; it is this "forwarder" that will send AMQP deliveries to your listener actor. When a channel crashes, or the connection that holds it is closed, the forwarder actor is closed, and the ChannelOwner will create a new one when it receives its new channel. This means that the actor your listener would send the pending Acks to does not exist anymore, and these Acks end up in the dead-letter mailbox: you might see warnings such as: Message [com.github.sstone.amqp.Amqp$Ack] from Actor[XXX] to Actor[YYY] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

I'll do more tests this week, and should merge this branch wip-become with scala2.10 very soon, and publish a new RC for 1.3 If you have time and are willing to test 1.3-WIP-BECOME-SNAPSHOT I would welcome your feedback.

Thanks!

tomrogers3 commented 10 years ago

Hi @sstone, thanks very much for the quick turn around, much appreciated.

A question / potential issue though - how can we ack and consequently dequeue those unacked messages after the channel comes up again? From what I've seen when a msg is read from MQ, it'll stay in the unacked state in MQ until it's acked or nacked, or our application shuts down, releasing the consumer on MQ which releases the messages back to the 'ready' state on the broker.

In the fix you've described you're saying when the channel comes up again the actor our listener was using doesn't exist anymore, so the acks go to the dead letter mailbox, so wouldn't this then mean the partially processed messages will stay in an unacked state on the broker, with no way to ack or nack them since the delivery tags are no longer valid? I'm no RabbitMQ expert so let me know if you feel otherwise but wouldn't you also want to do something such as a basicCancel and/or basicRecover in the amqp-client to redefine the consumer or perform some other appropriate action at the same time as it recreates the channel so those unacked messages could be reconsumed, reprocessed and then acked via the new channel? I'm interested in your thoughts. Thanks again.

tomrogers3 commented 10 years ago

For discussion and reference, here are the relevant API docs for basicCancel and basicRecover: http://www.rabbitmq.com/javadoc/com/rabbitmq/client/Channel.html#basicCancel(java.lang.String)

sstone commented 10 years ago

mmm.... I'll double check tomorrow but when a channel crashes, messages that were delivered to consumers on this channel but not acknowledged should be delivered again (this time with the redeliver flag set to 'true'): they should not remain unacked on the broker. You're supposed to see unacked messages only when they have been delivered, the channel they have been delivered to is still there but Acks have not been sent back. Calling basicRecover would have the broker send them again (possibly to different consumers) but in our case, recovering from a channel failure, it should not be necessary.

sstone commented 10 years ago

I've added a specific test (https://github.com/sstone/amqp-client/blob/wip-become/src/test/scala/com/github.sstone/amqp/PendingAcksSpec.scala) which illustates what I meant: when a consumer disappears, unacked messages are redelivered.

tomrogers3 commented 10 years ago

Thanks. I took a look at the test you wrote - agreed, when a consumer disappears, then it's deregistered on the broker and the unacked messages are redelivered to the new consumer. However, correct me if I'm wrong here but it doesn't seem that amqp-client is creating a new consumer, only a new channel. When a new channel is created behind the scenes by amqp-client in the case of a failure, the existing consumer/listener is still sitting there trying to listen to the new channel - it can receive and ack newly arriving messages fine, but it can't get those previously unacked messages acked unless the consumer somehow is killed and a new consumer is created to listen to the new channel, which would cause the unacked messages to be redelivered as per what your test shows.

Hence I'm suggesting you may want to use basicCancel, or a poisonpill (like you used in your test) or whatever makes sense to kill and create a new consumer at the same time the channel is recreated. Does this make sense? What are your thoughts?

Thanks again, I really appreciate your responsiveness and help on this.

sstone commented 10 years ago

On the broker side, consumer tags and delivery tags are local to a specific channel; when the channel dies, the consumer dies too and unacked messages will be delivered again.

On the client side, when a Consumer actor's channel crashes, it aks its parent for a new one, creates a new "ChannelForwarder" with it, and creates a new consumer which it binds to the registered queue and/or bindings. This consumer should now receive pending messages.

I've added a new specific test for this: https://github.com/sstone/amqp-client/blob/wip-become/src/test/scala/com/github.sstone/amqp/PendingAcks1Spec.scala

Does it match your use case ? Maybe I've missed something...

Thanks!

tomrogers3 commented 10 years ago

Hmm ok thanks for adding that test, that's helpful. I'm not sure if it's exactly our use case since we think the channel died by an event triggered on the broker side rather than the client but it may be as close as we can get in a unit test, let me know if you feel otherwise. Regression testing of your changes look good in our dev environment, I'm going to try this version in our integration testing environment and see how we go.

For more background - the error we saw was due to a blip on the broker side we think, rather than the client side. We have a 2 node cluster and our general theory of what caused the blip was at least one of the nodes was redeployed/updated and may have somehow triggered the channel to die from the broker side leading to the channel/consumer to not be closed/released properly, and these error messages. Our service normally has 1 connection and 2 channels each with a consumer (i.e. one for publish and the other for subscribe), and after the blip we could see in the RMQ mgmt UI 1 connection and 3 channels / consumers - 1 channel / consumer seemed to be the old one and contained the unacked messages, and then there was the new channel created. It seemed acks were being sent along the new channel with invalid delivery tags and those unacked messages were stuck on the old zombie channel that was still open. The only way we could fix the issue was by restarting our service, causing the unacked messages to be returned to ready state, then reconsumed, processed and acked with valid delivery tags, our service is designed to be idempotent so processing the same msg twice is ok for us, but our business can't tolerate stuck or unprocessed messages.

If you have any further thoughts on whether these new updates you've implemented would address such a situation where the channel dies on the broker side then please let me know, otherwise I'll close this issue down for now and let you know if we have any further issues.

Thanks again, much appreciated for all your help!

sstone commented 10 years ago

You're welcome! I don't have much experience with clustered brokers, though I remember from the RMQ mailing list that sometimes cluster updates can go wrong... I wonder if enabling heartbeats would help ? I've experienced "stuck messages" problems before, but each time we just restarted the service as you did. If think of something I'll update this issue. Thanks!

yonitouitou commented 6 years ago

Hi All,

Is this issue solved, i have exactly the same error when i stop the consumer .