amqp-node / amqplib

AMQP 0-9-1 library and client for Node.JS
https://amqp-node.github.io/amqplib/
Other
3.67k stars 475 forks source link

unbind consumer from the queue? potential memory leak on reconnect #317

Closed srkimir closed 2 years ago

srkimir commented 7 years ago

This is more like a question, not an issue but i dont know where should i ask questions related to this package. If you take a look at connect function, on every successful connect/reconnect it will call _setupRabbitTopology function which will assert exchanges, queues and which will subscribe consumer to a queue.

So what im doing is creating reconnection logic that goes like this (please check _setupRabbitTopology function for question)

module.exports = {
  connect: function () {
    return amqp.connect(CONNECTION_STRING)
      .then(connection => connection.createChannel())
      .then(channel => {
        this.channel = channel
        this._setupListeners()
        return this._setupRabbitTopology()
      })
      .then(() => this.channel)
      .catch(err => this._reconnect())
  },
  _setupListeners: function () {
    this.channel.on('error', (err) => { this._reconnect())
    this.channel.on('close', () => { this._reconnect() })
  },
  _reconnect: function () {    
    if (this.channel) { this.channel.removeAllListeners() }

    return this._delayReconnect().then(() => {
      return this.connect()
    })
  },
  _delayReconnect: function () {
    return new Promise(fulfill => {
      setTimeout(fulfill, RECONNECTING_INTERVAL)
    })
  },
  _setupRabbitTopology: function () {
    return Promise.all([ // assert all exchanges ])
    .then(() =>Promise.all([ // assert all queues ])
    .then(() => Promise.all([ // bind all queues])
    .then(() => Promise.all([
        /**
          * Here is the question, so on every reconnect new channel will be created
          * and all consumers needs to be registered to queues again right? with:
          * `this.channel.consume(queue, consumerLogic)` but what is happening with
          * previous subscriptions? should i omit channel.consume thing entirely on reconnect
          * or does library provide some way to clean previous subscription, cause as i'm seeing it,
          * this code will potentially create memory leaks?
          */
    ]))
  }
}
squaremo commented 7 years ago

If you know that _setupRabbitTopology will only be called with a fresh connection, I think it is safe. My reasoning is that a consumer's lifetime is scoped to the channel on which it's declared; so, if the channel goes away, which it will when the connection is closed, so does the consumer.

All the other declarations are idempotent; i.e., declaring things again won't create a new one.

A caveat: you can leak queues if you are declaring them without a name (which tells the server to make up a name for you). If you are doing this, also make them exclusive, which will tie their lifetime to the connection as well.