SpinGo / op-rabbit

The Opinionated RabbitMQ Library for Scala and Akka
Other
232 stars 73 forks source link

Publisher channel was disconnected unexpectedly #87

Open backingwu opened 8 years ago

backingwu commented 8 years ago

I use op-rabbit as a consumer client, when application start for a few seconds, it throw follow exception:

[ERROR] [08/02/2016 16:46:34.437] [such-system-akka.actor.default-dispatcher-8] [akka://such-system/user/$a/confirmed-publisher] Publisher channel was disconnected unexpectedly
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717)
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:565)
    at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.EOFException
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534)
    ... 1 more

I use rabbitmq haproxy, the connection setup:

op-rabbit {

  topic-exchange-name = "admin"

  connection {

    virtual-host = "test"

    hosts = ["xx.xx.xx.xx"]

    username = "test"

    password = "test"

    port = 5672

    ssl = false

    timeout = 5s
}
}

and consume code:

implicit val actorSystem = ActorSystem("such-system")
  val rabbitControl = actorSystem.actorOf(Props[RabbitControl])

  implicit def intToString(x: Array[Int]) = x.map(_.toString)
  implicit val msgFormat = Json.format[Msg]

  Subscription.run(rabbitControl) {
   channel(qos = 10) {
      val res = consume(Queue.passive("syncdata")) {
        (body(as[Msg]) & routingKey) { (msg, key) =>
          println(s"In client, msg: ${msg.ids.mkString("-")}")
          ack
        }
      }
      res
     }
  }
backingwu commented 8 years ago

This problem didn't appear in single node mode, may be not support very well when using rabbitmq haproxy?

timcharper commented 8 years ago

@shizafon thank you for posting your resolution; can you provide more detail about why the problem was happening and what steps you took to resolve it, for the benefit of other users of the library?

SergeATX commented 8 years ago

@timcharper, Please ignore my comment I guess I got confused.I used parseAddress on a list of addresses instead of parseAddresses. It had nothing to do with the ports. I'll go ahead and delete my comment.

timcharper commented 8 years ago

@backingwu I'm afraid I have very little ability to reproduce this on my end. I've never deployed or used rabbitmq with haproxy so I don't know what might be going wrong. Is there any more information you can provide on your end? Is this very easy to reproduce (happens every time)? Does it seem like a race condition?

Maybe it's a question the folks at https://github.com/thenewmotion/akka-rabbitmq can help with.

brijeshkec11 commented 7 years ago

seeing the same issue in my setup quite frequently. I am also using rabbitmq haproxy. any workaround to this problem?

timcharper commented 7 years ago

Sorry, @brijeshkec11! I've never used rabbitmq-haproxy and I have no idea why it is disconnecting. :(

If you can help isolate it and reproduce it, I'd be happy to help you try and find a fix!

It would be helpful to understand if it happens with the Java RabbitMQ library alone, on which this library depends.

Thanks!

Tim

brijeshkec11 commented 7 years ago

Hi Tim, I saw hit this issue with rabbitmq cluster as well (no ha-proxy) so may not be related to ha proxy. I will write a sample code and will try to recreate this issue. will share the steps once i am able to recreate this with sample code.

One more observation, with less load i am not seeing this issue, the moment i increase the load (pps in RMQ) this issue is getting reproduced immediately so may be some race condition in akka or op-rabbit library.

Here are the different exceptions I saw when this issue arises

[ERROR] [04/13/2017 09:12:35.981] [feed-receiver-op-rabbit.default-channel-dispatcher-65] [akka://feed-receiver/user/$a/confirmed-publisher] Publisher channel was disconnected unexpectedly com.rabbitmq.client.ShutdownSignalException: connection error at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:569) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:154) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readFully(DataInputStream.java:169) at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:117) at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ... 1 more

[ERROR] [04/13/2017 09:12:36.001] [feed-receiver-akka.actor.default-dispatcher-10] [akka://feed-receiver/user/$a/subscription-feed-receiver-queue-1/consumer] Connection reset
java.net.SocketException: Connection reset
  at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:150)
at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:514)
at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:125)                                                                                                                                        at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:316)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:292)                                                                                                                                        at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:285)
at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1012)
at com.spingo.op_rabbit.impl.AsyncAckingRabbitConsumer.handleAckOrNack(AsyncAckingRabbitConsumer.scala:160)                                                                                                 at com.spingo.op_rabbit.impl.AsyncAckingRabbitConsumer$$anonfun$connected$1.applyOrElse(AsyncAckingRabbitConsumer.scala:80)
at akka.actor.Actor$class.aroundReceive(Actor.scala:484)                                                                                                                                                    at com.spingo.op_rabbit.impl.AsyncAckingRabbitConsumer.aroundReceive(AsyncAckingRabbitConsumer.scala:19)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)                                                                                                                                                 at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)                                                                                                                                                  at akka.dispatch.Mailbox.run(Mailbox.scala:224)                                                                                                                                                             at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)                                                                                                                                     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)                                                                                                                                 at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)                                                                                                                      

[WARN] [04/13/2017 09:12:36.175] [feed-receiver-op-rabbit.default-channel-dispatcher-67] [akka://feed-receiver/user/$a/connection/$a] akka://feed-receiver/user/$a/connection/$a disconnected
[WARN] [04/13/2017 09:12:36.175] [feed-receiver-akka.actor.default-dispatcher-11] [akka://feed-receiver/user/$a/connection/confirmed-publisher-channel] akka://feed-receiver/user/$a/connection/confirmed-publisher-channel disconnected [WARN] [04/13/2017 09:12:36.175] [feed-receiver-akka.actor.default-dispatcher-8] [akka://feed-receiver/user/$a/connection/$b] akka://feed-receiver/user/$a/connection/$b disconnected [WARN] [04/13/2017 09:12:36.176] [feed-receiver-akka.actor.default-dispatcher-4] [akka://feed-receiver/user/$a/connection] akka://feed-receiver/user/$a/connection lost connection to amqp://athena@{172.55.101.60:5672,172.55.101.61:5672}:5672//
[INFO] [04/13/2017 09:12:36.483] [feed-receiver-op-rabbit.default-channel-dispatcher-65] [akka://feed-receiver/user/$a/connection/$a] akka://feed-receiver/user/$a/connection/$a connected [INFO] [04/13/2017 09:12:36.502] [feed-receiver-akka.actor.default-dispatcher-4] [akka://feed-receiver/user/$a/connection] akka://feed-receiver/user/$a/connection connected to amqp://athena@{172.55.101.60:5672,172.55.101.61:5672}:5672// [INFO] [04/13/2017 09:12:36.504] [feed-receiver-akka.actor.default-dispatcher-3] [akka://feed-receiver/user/$a/connection/$b] Message [com.rabbitmq.client.impl.ChannelN] from Actor[akka://feed-receiver/user/$a/connection#528016613] to Actor[akka://feed-receiver/user/$a/connection/$b#143722208] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [04/13/2017 09:12:36.505] [feed-receiver-akka.actor.default-dispatcher-8] [akka://feed-receiver/user/$a/connection/confirmed-publisher-channel] akka://feed-receiver/user/$a/connection/confirmed-publisher-channel connected [INFO] [04/13/2017 09:12:36.571] [feed-receiver-akka.actor.default-dispatcher-10] [akka://feed-receiver/user/$a/connection/$b] Message [com.rabbitmq.client.impl.ChannelN] from Actor[akka://feed-receiver/user/$a/connection#528016613] to Actor[akka://feed-receiver/user/$a/connection/$b#143722208] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

second time when i hit this isue the exception was:

[ERROR] [04/13/2017 14:11:52.953] [feed-receiver-op-rabbit.default-channel-dispatcher-16] [akka://feed-receiver/user/$a/confirmed-publisher] Publisher channel was disconnected unexpectedly com.rabbitmq.client.ShutdownSignalException: connection error at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:569) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:209) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ... 1 more

      [ERROR] [04/13/2017 14:11:52.970] [feed-receiver-akka.actor.default-dispatcher-2] [akka://feed-receiver/user/$a/subscription-feed-receiver-queue-1/consumer] Broken pipe
      java.net.SocketException: Broken pipe
              at java.net.SocketOutputStream.socketWrite0(Native Method)
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
      at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
      at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
      at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
      at java.io.DataOutputStream.flush(DataOutputStream.java:123)
      at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:150)
      at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:514)
      at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:125)
      at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:316)
      at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:292)
      at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:285)
      at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1012)
      at com.spingo.op_rabbit.impl.AsyncAckingRabbitConsumer.handleAckOrNack(AsyncAckingRabbitConsumer.scala:160)
      at com.spingo.op_rabbit.impl.AsyncAckingRabbitConsumer$$anonfun$connected$1.applyOrElse(AsyncAckingRabbitConsumer.scala:80)
      at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
      at com.spingo.op_rabbit.impl.AsyncAckingRabbitConsumer.aroundReceive(AsyncAckingRabbitConsumer.scala:19)
      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
      at akka.actor.ActorCell.invoke(ActorCell.scala:495)
      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
      at akka.dispatch.Mailbox.run(Mailbox.scala:224)
      at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[WARN] [04/13/2017 14:11:53.138] [feed-receiver-op-rabbit.default-channel-dispatcher-16] [akka://feed-receiver/user/$a/connection/$a] akka://feed-receiver/user/$a/connection/$a disconnected [WARN] [04/13/2017 14:11:53.137] [feed-receiver-akka.actor.default-dispatcher-2] [akka://feed-receiver/user/$a/connection/$b] akka://feed-receiver/user/$a/connection/$b disconnected [WARN] [04/13/2017 14:11:53.137] [feed-receiver-akka.actor.default-dispatcher-4] [akka://feed-receiver/user/$a/connection] akka://feed-receiver/user/$a/connection lost connection to amqp://athena@{172.55.101.61:5672}:5672// [WARN] [04/13/2017 14:11:53.135] [feed-receiver-akka.actor.default-dispatcher-9] [akka://feed-receiver/user/$a/connection/confirmed-publisher-channel] akka://feed-receiver/user/$a/connection/confirmed-publisher-channel disconnected [INFO] [04/13/2017 14:11:53.266] [feed-receiver-op-rabbit.default-channel-dispatcher-6] [akka://feed-receiver/user/$a/connection/$a] akka://feed-receiver/user/$a/connection/$a connected [INFO] [04/13/2017 14:11:53.328] [feed-receiver-akka.actor.default-dispatcher-2] [akka://feed-receiver/user/$a/connection/$b] Message [com.rabbitmq.client.impl.ChannelN] from Actor[akka://ca-s e-feed-receiver/user/$a/connection#1150657647] to Actor[akka://feed-receiver/user/$a/connection/$b#-1283180273] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [04/13/2017 14:11:53.339] [feed-receiver-akka.actor.default-dispatcher-4] [akka://feed-receiver/user/$a/connection] akka://feed-receiver/user/$a/connection connected to amqp://athena@{172.55.101.61:5672}:5672// [INFO] [04/13/2017 14:11:53.357] [feed-receiver-akka.actor.default-dispatcher-2] [akka://feed-receiver/user/$a/connection/confirmed-publisher-channel] akka://feed-receiver/user/$a/connection/confirmed-publisher-channel connected [INFO] [04/13/2017 14:11:53.361] [feed-receiver-akka.actor.default-dispatcher-3] [akka://feed-receiver/user/$a/connection/$b] Message [com.rabbitmq.client.impl.ChannelN] from Actor[akka://ca-s e-feed-receiver/user/$a/connection#1150657647] to Actor[akka://feed-receiver/user/$a/connection/$b#-1283180273] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

after this it is not even retrying to connect back. no further logs are op-rabbit after this.

asheshambasta commented 7 years ago

@timcharper Same issue, with my setup I have (AWS):

Looks like the clients keep disconnecting from time to time (every 60s) which is the TTL for the DNS record, so it could be the DNS?

asheshambasta commented 7 years ago

Actually, never mind, I've been experimenting with more TTL values of the DNS record and it doesn't seem to make a difference. In my case, the problem seems to be the load balancer that I'm connecting to RabbitMQ with. The IdleTimeout value on the ELB is set to 60s.