ScalaConsultants / reactive-rabbit

Reactive Streams driver for AMQP protocol. Powered by RabbitMQ library.
Apache License 2.0
184 stars 40 forks source link

RabbitMQ channel is not closed if Subscriber is never subscribed to #59

Closed breckcs closed 7 years ago

breckcs commented 7 years ago

If the publish method is called but the resulting Subscriber is never subscribed to (e.g., the stream is never run), a RabbitMQ channel will be created and never closed. If this happens repeatedly, the RabbitMQ connection can accumulate the maximum number of channels for a connection (65535). From that point forward, new channels cannot be created (connection.createChannel will return null). Operations on already established channels will continue to work.

Calling the publish method creates a Subscriber:

// create org.reactivestreams.Subscriber
val exchange = connection.publish(exchange = "accounting_department",
  routingKey = "invoices")

The publish method creates a new channel:

override def publish(exchange: String, routingKey: String) =
  new Subscriber[Message] {
    val channel = underlying.createChannel()
    val delegate = new ExchangeSubscriber(channel, exchange)

This channel will be closed in the onError or onCompete method of the Subscriber. However, if the Subscriber is never subscribed to, these methods will not be called and the channel will never be closed.

The channel should be created in the onSubscribe method of the Subscriber. For additional safety, ensure this unmanaged resource is closed in the finalize method.

I ran into this situation declaring a Flow to handle a WebSocket connection, which is passed to the Akka HTTP handleWebSocketMessages method to run. If the WebSocket connection is terminated soon after it is opened, handleWebSocketMessages may not run the Flow and the channel will never be closed. I noticed this after running for an extended period of time and eventually accumulating the maximum number of channels for a connection (65535).

breckcs commented 7 years ago

This issue is resolved now that #60 has been merged to master.