Closed jschaul closed 8 years ago
This looks like a bug with the Java driver. Try using amqp-client 3.6.1 with RabbitMQ and see if it still suffers, or report a bug with them?
Thanks, I will try upgrading and see if the issue persists. I can re-open this issue if it's not solved.
TL;DR: If you have this, or similar problem with disconnects, use
consume(Queue.passive(topic(queue("wow-maybe-queue"), List("some-topic.#"))))
Full explanation:
In case anyone faces similar issues (they still occur with op-rabbit 1.3.0), in a race-condition way (sometimes occurs, sometimes not), some explanations:
The exact problem is that after a disconnect to rabbitmq (forced from rabbitmq side by e.g. closing the connection via the rabbitmq web interface), op-rabbit successfully reconnects to rabbitmq, but fails to bind a queue.
Start a consumer (all as supposed to be):
[info] INFO c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/confirmed-publisher-channel connected
[info] INFO c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/publisher connected
[info] INFO c.t.akka.rabbitmq.ConnectionActor - akka://default/user/$a/connection connected to amqp://vagrant@{<IP>:5672}:5672//
[info] INFO c.d.d.c.p.DCAwareRoundRobinPolicy - Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor)
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(Run,DisconnectedPayload(Paused,10,None)) from Actor[akka://default/user/$a#-246056593]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(ChannelCreated(Actor[akka://default/user/$a/connection/$a#-1186346017]),DisconnectedPayload(Binding,10,None)) from Actor[akka://default/user/$a/connection#1872231705]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - Channel created; AMQChannel(amqp://vagrant@<IP>:5672/,3)
[info] INFO c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/$a connected
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(ChannelConnected(AMQChannel(amqp://vagrant@<IP>:5672/,3),Actor[akka://default/user/$a/connection/$a#-1186346017]),DisconnectedPayload(Binding,10,None)) from Actor[akka://default/user/$a/subscription-my-queue-name-1#1860926862]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - Setting up subscription to my-queue-name in akka://default/user/$a/subscription-my-queue-name-1
[info] DEBUG c.s.op_rabbit.SubscriptionActor - transition Disconnected -> Binding
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(BindSuccess(AMQChannel(amqp://vagrant@<IP>:5672/,3)),ConnectedPayload(Actor[akka://default/user/$a/connection/$a#-1186346017],AMQChannel(amqp://vagrant@<IP>:5672/,3),10,Some(Actor[akka://default/user/$a/subscription-my-queue-name-1/consumer#-1397250152]),None)) from Actor[akka://default/user/$a/subscription-my-queue-name-1#1860926862]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - transition Binding -> Running
let's force a connection close via rabbitmq console (this time, the race condition is on our side, reconnect is successful)
[info] WARN c.t.akka.rabbitmq.ConnectionActor - akka://default/user/$a/connection lost connection to amqp://vagrant@{<IP>:5672}:5672//
[info] WARN c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/$a disconnected
[info] WARN c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/confirmed-publisher-channel disconnected
[info] WARN c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/publisher disconnected
[info] DEBUG c.s.op_rabbit.SubscriptionActor - Channel created; AMQChannel(amqp://vagrant@<IP>:5672/,1)
[info] INFO c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/$a connected
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(ChannelConnected(AMQChannel(amqp://vagrant@<IP>:5672/,1),Actor[akka://default/user/$a/connection/$a#-1186346017]),ConnectedPayload(Actor[akka://default/user/$a/connection/$a#-1186346017],AMQChannel(amqp://vagrant@<IP>:5672/,3),10,Some(Actor[akka://default/user/$a/subscription-my-queue-name-1/consumer#-1397250152]),None)) from Actor[akka://default/user/$a/subscription-my-queue-name-1#1860926862]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - Setting up subscription to my-queue-name in akka://default/user/$a/subscription-my-queue-name-1
[info] INFO c.t.akka.rabbitmq.ConnectionActor - akka://default/user/$a/connection connected to amqp://vagrant@{<IP>:5672}:5672//
[info] INFO c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/publisher connected
[info] INFO c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/confirmed-publisher-channel connected
[info] DEBUG c.s.op_rabbit.SubscriptionActor - Channel created; AMQChannel(amqp://vagrant@<IP>:5672/,4)
[info] DEBUG c.s.op_rabbit.SubscriptionActor - transition Running -> Binding
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(ChannelConnected(AMQChannel(amqp://vagrant@<IP>:5672/,4),Actor[akka://default/user/$a/connection/$a#-1186346017]),ConnectedPayload(Actor[akka://default/user/$a/connection/$a#-1186346017],AMQChannel(amqp://vagrant@<IP>:5672/,1),10,Some(Actor[akka://default/user/$a/subscription-my-queue-name-1/consumer#-1397250152]),None)) from Actor[akka://default/user/$a/subscription-my-queue-name-1#1860926862]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - Setting up subscription to my-queue-name in akka://default/user/$a/subscription-my-queue-name-1
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(BindFailure(AMQChannel(amqp://vagrant@<IP>:5672/,1),java.io.IOException),ConnectedPayload(Actor[akka://default/user/$a/connection/$a#-1186346017],AMQChannel(amqp://vagrant@<IP>:5672/,4),10,Some(Actor[akka://default/user/$a/subscription-my-queue-name-1/consumer#-1397250152]),None)) from Actor[akka://default/user/$a/subscription-my-queue-name-1#1860926862]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(BindSuccess(AMQChannel(amqp://vagrant@<IP>:5672/,4)),ConnectedPayload(Actor[akka://default/user/$a/connection/$a#-1186346017],AMQChannel(amqp://vagrant@<IP>:5672/,4),10,Some(Actor[akka://default/user/$a/subscription-my-queue-name-1/consumer#-1397250152]),None)) from Actor[akka://default/user/$a/subscription-my-queue-name-1#1860926862]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - transition Binding -> Running
repeat: let's force a connection close via rabbitmq console: this time we see the error. The result is a consumer that is stuck and never recovers.
[info] WARN c.t.akka.rabbitmq.ConnectionActor - akka://default/user/$a/connection lost connection to amqp://vagrant@{<IP>:5672}:5672//
[info] WARN c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/$a disconnected
[info] WARN c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/publisher disconnected
[info] WARN c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/confirmed-publisher-channel disconnected
[info] DEBUG c.s.op_rabbit.SubscriptionActor - Channel created; AMQChannel(amqp://vagrant@<IP>:5672/,1)
[info] INFO c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/$a connected
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(ChannelConnected(AMQChannel(amqp://vagrant@<IP>:5672/,1),Actor[akka://default/user/$a/connection/$a#-1186346017]),ConnectedPayload(Actor[akka://default/user/$a/connection/$a#-1186346017],AMQChannel(amqp://vagrant@<IP>:5672/,4),10,Some(Actor[akka://default/user/$a/subscription-my-queue-name-1/consumer#-1397250152]),None)) from Actor[akka://default/user/$a/subscription-my-queue-name-1#1860926862]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - Setting up subscription to my-queue-name in akka://default/user/$a/subscription-my-queue-name-1
[info] INFO c.t.akka.rabbitmq.ConnectionActor - akka://default/user/$a/connection connected to amqp://vagrant@{<IP>:5672}:5672//
[info] INFO c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/publisher connected
[info] INFO c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/confirmed-publisher-channel connected
[info] DEBUG c.s.op_rabbit.SubscriptionActor - transition Running -> Binding
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(BindFailure(AMQChannel(amqp://vagrant@<IP>:5672/,1),java.io.IOException),ConnectedPayload(Actor[akka://default/user/$a/connection/$a#-1186346017],AMQChannel(amqp://vagrant@<IP>:5672/,1),10,Some(Actor[akka://default/user/$a/subscription-my-queue-name-1/consumer#-1397250152]),None)) from Actor[akka://default/user/$a/subscription-my-queue-name-1#1860926862]
[info] ERROR c.s.op_rabbit.SubscriptionActor - An error while trying to bind a consumer to my-queue-name
[info] java.io.IOException: null
[info] at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
[info] at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
[info] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
[info] at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844)
[info] at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
[info] at com.spingo.op_rabbit.QueueConcrete.declare(Queue.scala:27)
[info] at com.spingo.op_rabbit.Binding$$anon$2.declare(Binding.scala:70)
[info] at com.spingo.op_rabbit.SubscriptionActor.doSubscribe(SubscriptionActor.scala:232)
[info] at com.spingo.op_rabbit.SubscriptionActor$$anonfun$8.applyOrElse(SubscriptionActor.scala:159)
[info] at com.spingo.op_rabbit.SubscriptionActor$$anonfun$8.applyOrElse(SubscriptionActor.scala:146)
[info] at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
[info] at akka.actor.FSM$$anonfun$handleTransition$1.apply(FSM.scala:604)
[info] at akka.actor.FSM$$anonfun$handleTransition$1.apply(FSM.scala:604)
[info] at scala.collection.immutable.List.foreach(List.scala:381)
[info] at akka.actor.FSM$class.handleTransition(FSM.scala:604)
[info] at akka.actor.FSM$class.makeTransition(FSM.scala:686)
[info] at com.spingo.op_rabbit.SubscriptionActor.makeTransition(SubscriptionActor.scala:13)
[info] at akka.actor.FSM$class.applyState(FSM.scala:671)
[info] at com.spingo.op_rabbit.SubscriptionActor.applyState(SubscriptionActor.scala:13)
[info] at akka.actor.FSM$class.processEvent(FSM.scala:666)
[info] at com.spingo.op_rabbit.SubscriptionActor.akka$actor$LoggingFSM$$super$processEvent(SubscriptionActor.scala:13)
[info] at akka.actor.LoggingFSM$class.processEvent(FSM.scala:797)
[info] at com.spingo.op_rabbit.SubscriptionActor.processEvent(SubscriptionActor.scala:13)
[info] at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:655)
[info] at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:649)
[info] at akka.actor.Actor$class.aroundReceive(Actor.scala:482)
[info] at com.spingo.op_rabbit.SubscriptionActor.aroundReceive(SubscriptionActor.scala:13)
[info] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
[info] at akka.actor.ActorCell.invoke(ActorCell.scala:495)
[info] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
[info] at akka.dispatch.Mailbox.run(Mailbox.scala:224)
[info] at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
[info] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[info] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[info] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[info] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[info] Caused by: com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
[info] at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
[info] at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
[info] at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
[info] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
[info] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
[info] ... 33 common frames omitted
[info] Caused by: com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
[info] at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:554)
[info] at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:509)
[info] at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:503)
[info] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[info] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[info] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[info] at java.lang.reflect.Method.invoke(Method.java:497)
[info] at com.thenewmotion.akka.rabbitmq.RabbitMqActor$class.closeIfOpen(RabbitMqActor.scala:27)
[info] at com.thenewmotion.akka.rabbitmq.ChannelActor.closeIfOpen(ChannelActor.scala:34)
[info] at com.thenewmotion.akka.rabbitmq.ChannelActor$$anonfun$2.applyOrElse(ChannelActor.scala:115)
[info] at com.thenewmotion.akka.rabbitmq.ChannelActor$$anonfun$2.applyOrElse(ChannelActor.scala:112)
[info] at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
[info] at akka.actor.FSM$class.processEvent(FSM.scala:661)
[info] at com.thenewmotion.akka.rabbitmq.ChannelActor.processEvent(ChannelActor.scala:34)
[info] at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:655)
[info] at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:649)
[info] at akka.actor.Actor$class.aroundReceive(Actor.scala:482)
[info] at com.thenewmotion.akka.rabbitmq.ChannelActor.aroundReceive(ChannelActor.scala:34)
[info] ... 9 common frames omitted
[info] DEBUG c.s.op_rabbit.SubscriptionActor - transition Binding -> Stopped
I was able to reproduce this error about 30% of the time. I tried different versions of the java amqp-client
and the akka-rabbitmq
libraries. This seems to be a bug with op-rabbit only.
If you require it, I'm able to write a failing test (though it takes a bit of work as it involves a running mq instance to contact & force close a connection in the test); let me know if you need this.
The workaround, which does not exhibit that problem, is to use Queue.passive with a queue definition:
consume(Queue.passive(topic(queue("wow-maybe-queue"), List("some-topic.#"))))
Perhaps this could be the default behaviour?
Hello,
I think the issue is actually caused by a combination of less-than-ideal design in akka-rabbitmq
( exposing mutable internal state via the channel
) and the fact that op-rabbit
holds a reference to the channel
itself here: once we are outside of the CreateChannel
closure, there is no guarantee about the channel
state and this could lead to errors. In this particular case, akka-rabbitmq
re-creates the channel
twice, so the one referenced in op-rabbit
might be already closed: one approach to solve this would be to execute all the channel-related logic inside the above-mentioned closure.
Hi @fedragon - Thank you so much for your detailed report. I've read and have considered it. Part of the problem here is that sometimes bindings fail legitimately, and crashing the actor is the appropriate thing to do. However, the case you describe here is clearly an exception.
It might be worthy to note that the problem isn't exactly the shared mutable state, although that can be a problem. Doing all channel operations in the channelActor sub-optimal for performance reasons. I am careful to not concurrently apply operations to the channel, and it is always possible for a channel to close right under your feet. Eventually, I would like all channel operations for a consumer to happen in the consumer actor.
I pushed up a potential fix for this issue. I look at the channel state after a BindFailure is received. If the channel is marked as closed, which it should be after such an exception, then I ignore the exception and wait for a new channel to be created and sent (depending on the behavior of ChannelActor).
Can you please test this and see if it helps your case? If it is possible to write a test for it, it would be really great. However, if the test is going to be brittle due to the complexity of testing for this, it may be okay to forego it.
(PS: careful != correct! On review I noticed and fixed a potential issue with setQos
.)
Thank you @timcharper we will test it and come back to you as soon as possible!
Great, thank you! I just pushed v1.5.0-RC1
which contains the proposed fix.
This looks promising, thank you! I'll test and let you know.
Hello @timcharper, I've tested this manually and it works, successfully rebinding to the queue after each disconnect.
\o/
My manual tests also do not exhibit the faulty behaviour anymore on 1.5.0-RC1.
UPDATE: see https://github.com/SpinGo/op-rabbit/issues/72#issuecomment-245530033 for explanation/workaround of the problem. Original thread left for reference below.
Either op-rabbit or the underlying libraries seem to cause an unrecoverable crash for op-rabbit based consumers after the rabbitmq instance is unavailable (e.g. crashed itself) for a short period of time. Once the actual RabbitMQ is back up, the expected reconnect does not happen for the consumers. Instead the following errors occurred (see stack traces below)
This happened with version 1.2.1 of op-rabbit.
Is there a way to handle this and try to reconnect?
logger name:
akka.actor.OneForOneStrategy
full stacktrace:followed by a logger name:
com.spingo.op_rabbit.SubscriptionActor
message:An error while trying to bind a consumer to <queue-name>
full stacktrace: