Open ssteward54 opened 8 years ago
@ssteward54 Op-Rabbit actually uses akka-rabbitmq to re-establish connections and re-create topologies. There are some problems with the Java client's implementation of recovery: for example, if failure occurs during initial topology setup then the path to getting things initialized correctly is unclear.
I'm wondering what might be going wrong. I'm inclined to believe that some exception must be getting thrown. Is it possible to simulate failure in a dev environment? Can you provide logs?
I just tested this in a small demo project and can confirm that it works.
I both crashed the RabbitMQ server and force-closed the connection; in both cases the reconnection / recovery strategy worked. I am insanely curious as to why this isn't working for you. Correctness and recovery is the central point of this library and if that feature isn't working then this library is a massive failure.
Lately I've been seeing this pattern. We get 3 errors back to back. We get the Publisher channel was shutdown
exception, then An error while trying to bind a consumer
microseconds apart, then about a minute or so later, we get the first one again. At this point it's dead and not connected to Rabbit anymore.
Here are some logs from an app using op-rabbit version 1.3.0.
From com.spingo.op_rabbit.SubscriptionActor
Publisher channel was disconnected unexpectedly com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0) at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:662) at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:617) at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:107) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:542) at java.lang.Thread.run(Thread.java:745)
From com.spingo.op_rabbit.ConfirmedPublisherActor
An error while trying to bind a consumer to [QUEUE_NAME] java.io.IOException: null at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:833) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at com.spingo.op_rabbit.QueueConcrete.declare(Queue.scala:27) at com.spingo.op_rabbit.Binding$$anon$2.declare(Binding.scala:70) at com.spingo.op_rabbit.SubscriptionActor.doSubscribe(SubscriptionActor.scala:232) at com.spingo.op_rabbit.SubscriptionActor$$anonfun$8.applyOrElse(SubscriptionActor.scala:159) at com.spingo.op_rabbit.SubscriptionActor$$anonfun$8.applyOrElse(SubscriptionActor.scala:146) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at akka.actor.FSM$$anonfun$handleTransition$1.apply(FSM.scala:606) at akka.actor.FSM$$anonfun$handleTransition$1.apply(FSM.scala:606) at scala.collection.immutable.List.foreach(List.scala:381) at akka.actor.FSM$class.handleTransition(FSM.scala:606) at akka.actor.FSM$class.makeTransition(FSM.scala:688) at com.spingo.op_rabbit.SubscriptionActor.makeTransition(SubscriptionActor.scala:13) at akka.actor.FSM$class.applyState(FSM.scala:673) at com.spingo.op_rabbit.SubscriptionActor.applyState(SubscriptionActor.scala:13) at akka.actor.FSM$class.processEvent(FSM.scala:668) at com.spingo.op_rabbit.SubscriptionActor.akka$actor$LoggingFSM$$super$processEvent(SubscriptionActor.scala:13) at akka.actor.LoggingFSM$class.processEvent(FSM.scala:799) at com.spingo.op_rabbit.SubscriptionActor.processEvent(SubscriptionActor.scala:13) at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:657) at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:651) at akka.actor.Actor$class.aroundReceive(Actor.scala:484) at com.spingo.op_rabbit.SubscriptionActor.aroundReceive(SubscriptionActor.scala:13) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 33 common frames omitted Caused by: com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:546) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.thenewmotion.akka.rabbitmq.RabbitMqActor$class.closeIfOpen(RabbitMqActor.scala:27) at com.thenewmotion.akka.rabbitmq.ChannelActor.closeIfOpen(ChannelActor.scala:34) at com.thenewmotion.akka.rabbitmq.ChannelActor$$anonfun$2.applyOrElse(ChannelActor.scala:115) at com.thenewmotion.akka.rabbitmq.ChannelActor$$anonfun$2.applyOrElse(ChannelActor.scala:112) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at akka.actor.FSM$class.processEvent(FSM.scala:663) at com.thenewmotion.akka.rabbitmq.ChannelActor.processEvent(ChannelActor.scala:34) at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:657) at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:651) at akka.actor.Actor$class.aroundReceive(Actor.scala:484) at com.thenewmotion.akka.rabbitmq.ChannelActor.aroundReceive(ChannelActor.scala:34) ... 9 common frames omitted
That's the shutdown log; what do you see when it attempts to reconnect?
This is all we see. When I look at the RMQ admin console, it shows No Consumers connected to the queues after this happens, which of course means the messages just sit there.
Looks like the problem that was fixed in #72. @ssteward54 have you tried version 1.6.0?
We're having an issue whenever the RMQ server disconnects from any consumers and they lose their connection and stop working. I've been looking into this and am wondering if you could set the automatic recovery option available in the ConnectionFactory so this will reconnect.
https://www.rabbitmq.com/api-guide.html
Or maybe make this a configurable option we can turn on if we'd like? Thanks!