jhalterman / lyra

High availability RabbitMQ client
Apache License 2.0
262 stars 74 forks source link

Rapid growth of connections when network conditions are unreliable #86

Closed Dirk-c-Walter closed 7 years ago

Dirk-c-Walter commented 7 years ago

When the connection between the rabbit MQ host and the client is cut there can be an exponential growth in connections, this can be simulated by using iptables to cut the connection. For brief interruptions the recovery works, but if the connection is shut down for around a minute and then enables sometimes the client will reconnect twice for each queue it was listening on. So the first cycle it will reconnects with 2 queues if it disconnected again it will reconnect with 4, and so on.

This rapidly overwhelms the server if network conditions are unreliable, so pretty disastrous. I have seen other kinds of errors too that I am not able to replicate in the dev environment, but the effect is the same, the connections and queues grow rapidly and uncontrollably instead of just one per client.

Bellow is the Scala code used to connect in case there are errors with that. The setup is simple, it creates a connection then creates an ephemeral que and listens on it for messages. Done, this code is never revisited in a loop or anything like that, it lives from client startup until it dies.

` import net.jodah.lyra.util.Duration import net.jodah.lyra.{ConnectionOptions, Connections} import net.jodah.lyra.config.{Config, RecoveryPolicy, RetryPolicy}

...

val conn = sys.env.get("RABBIT_MQ_CONNECTION") val exchangeName = sys.env.get("RABBIT_MQ_EXCHANGE") val config = new Config() .withRecoveryPolicy(new RecoveryPolicy() .withBackoff(Duration.seconds(1), Duration.seconds(60))) .withRetryPolicy(new RetryPolicy() .withBackoff(Duration.seconds(1), Duration.seconds(60)) )

  //"amqp://userName:password@hostName:portNumber/virtualHost"
  val options = new ConnectionOptions()
    .withUri(conn.get)
    .withName("MessageStream")

  val confac = options.getConnectionFactory
  val connection = Connections.create(options, config)
  val channel = connection.createChannel()

  val q = channel.queueDeclare("", false, true, true, null)
  channel.queueBind(q.getQueue, exchangeName.get, "")
  channel.basicConsume(q.getQueue, true, msgListner)`
michaelklishin commented 7 years ago

We cannot suggest much with the amount of information provided. I'm quite certain this is #75, #79, …, all over again.

michaelklishin commented 7 years ago

75, #79 was addressed in #82 and released in Lyra 0.5.5. Please give it a try and/or disable RabbitMQ Java client recovery in your own code.

Dirk-c-Walter commented 7 years ago

A full test project; https://github.com/Dirk-c-Walter/LyraTest

So far I have not been able to recreate it with Lyra 0.5.5 so maybe it was the automatic connection recovery.