ScalaConsultants / reactive-rabbit

Reactive Streams driver for AMQP protocol. Powered by RabbitMQ library.
Apache License 2.0
184 stars 40 forks source link

Connection is closed when under 'flow control' #32

Closed LGLO closed 8 years ago

LGLO commented 8 years ago

I run this test app:

package io.scalac.amqp

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}

object CheckOrdering extends App {

  val max = 100000
  val connection = Connection()
  //E, Q1, Q2, and binding should be created before this 'test'.
  val q1 = connection.consume(queue = "Q1")
  val q2 = connection.consume(queue = "Q2")
  val e1 = connection.publish(exchange = "E", routingKey = "q1")
  val e2 = connection.publish(exchange = "E", routingKey = "q2")

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()

  var expected =  BigInt(1)
  Source.fromPublisher(q2).map(_.message).runForeach(checkExpected)
  Source.fromPublisher(q1).map(_.message).runWith(Sink.fromSubscriber(e2))
  Source.fromIterator(() => (1 to max).map(makeMsg).iterator).runWith(Sink.fromSubscriber(e1))

  def checkExpected(m: Message): Unit = {
    val actual = BigInt(m.body.toArray)
    if (actual % 100 == 0) println(actual)
    if(expected != actual){
      println(s"Expected: $expected, got: $actual")
      //stop()
    }
    if(expected == BigInt(max)){
      println(s"Finished!")
      stop()
    }
    expected = expected + 1
  }

  def stop(): Unit = {
    println("Shutting down connection")
    connection.shutdown()
    println("Shutting down system")
    system.shutdown()
    println("System exit")
    System.exit(0)
  }

  def makeMsg(i: Int): Message = Message(body = BigInt(i).toByteArray)
}

Q1 size increases quickly and broker sets flow control on this connection. I see consumption rate drops and then stalls and finally connection is closed. RabbitMQ logs:

=INFO REPORT==== 13-Jan-2016::10:00:53 ===
accepting AMQP connection <0.24452.8> (127.0.0.1:33086 -> 127.0.0.1:5672)

=WARNING REPORT==== 13-Jan-2016::10:02:41 ===
closing AMQP connection <0.24452.8> (127.0.0.1:33086 -> 127.0.0.1:5672):
client unexpectedly closed TCP connection

I think this is connected to #6 because when test is run with fix for synchronous publish it never happened.

LGLO commented 8 years ago

I've debugged deeper this issue and it turned out that rabbitmq amqp-client gets TimeoutException when closing channel. To make long story short: because of that exception Connection gets closed. This will cause exception when publishing, so all should be done is to cancel subscription.