postwait / node-amqp

[UNMAINTAINED] node-amqp is an AMQP client for nodejs
MIT License
1.69k stars 357 forks source link

Channels do not properly close on reconnect #399

Open canann13 opened 9 years ago

canann13 commented 9 years ago

We host the RabbitMQ broker on a diff server.

The node-amqp client runs on a seperate machine. A corner case we recently discovered happens when the client machine goes to sleep, or standby for any reason.

Under such circumstances, the usage of a heartbeat setting appropriately detects that the connection is no longer active after 2 heartbeat misses. The broker than emits the ECONNRESET.

When the client machine resumes operation, the node-amqp client correctly picks up on the ECONNRESET and begins the reconnection procedure.

Past this point, on the broker side, things do not go as we expect. The number of connections maintained and displayed on the server side remains correct, i.e. 1 connection coming from the node-amqp client. However, the number of channels have actually doubled. The old set of channels was not released, and a new set of channels is created. This effectively results in the server now hosting twice the number channels. Furthermore, of these channels, only half of them - the new set - are actually being used.

Based on the source code of the library:

var backoffTime = null;
  self.addListener('error', function backoff(e) {
    if (self._inboundHeartbeatTimer !== null) {
      clearTimeout(self._inboundHeartbeatTimer);
      self._inboundHeartbeatTimer = null;
    }
    if (self._outboundHeartbeatTimer !== null) {
      clearTimeout(self._outboundHeartbeatTimer);
      self._outboundHeartbeatTimer = null;
    }

    if (!self.connectionAttemptScheduled) {
      // Set to true, as we are presently in the process of scheduling one.
      self.connectionAttemptScheduled = true;

      // Kill the socket, if it hasn't been killed already.
      self.socket.end();

      // Reset parser state
      self.parser = null;

      // In order for our reconnection to be seamless, we have to notify the
      // channels that they are no longer connected so that nobody attempts
      // to send messages which would be doomed to fail.
      for (var channel in self.channels) {
        if (channel !== '0') {
          self.channels[channel].state = 'closed';
        }
      }
      // Queues are channels (so we have already marked them as closed), but
      // queues have special needs, since the subscriptions will no longer
      // be known to the server when we reconnect.  Mark the subscriptions as
      // closed so that we can resubscribe them once we are reconnected.
      for (var queue in self.queues) {
        for (var index in self.queues[queue].consumerTagOptions) {
          self.queues[queue].consumerTagOptions[index]['state'] = 'closed';
        }
      }

      // Begin reconnection attempts
      if (self.implOptions.reconnect) {
        // Don't thrash, use a backoff strategy.
        if (backoffTime === null) {
          // This is the first time we've failed since a successful connection,
          // so use the configured backoff time without any modification.
          backoffTime = self.implOptions.reconnectBackoffTime;
        } else if (self.implOptions.reconnectBackoffStrategy === 'exponential') {
          // If you've configured exponential backoff, we'll double the
          // backoff time each subsequent attempt until success.
          backoffTime *= 2;
          // limit the maxium timeout, to avoid potentially unlimited stalls
          if(backoffTime > self.implOptions.reconnectExponentialLimit){
            backoffTime = self.implOptions.reconnectExponentialLimit;
          }

        } else if (self.implOptions.reconnectBackoffStrategy === 'linear') {
          // Linear strategy is the default.  In this case, we will retry at a
          // constant interval, so there's no need to change the backoff time
          // between attempts.
        } else {
          // TODO should we warn people if they picked a nonexistent strategy?
        }

        setTimeout(function () {
          // Set to false, so that if we fail in the reconnect attempt, we can
          // schedule another one.
          self.connectionAttemptScheduled = false;
          self.reconnect();
        }, backoffTime);
      } else {
        self.removeListener('error', backoff);
      }
    }
  });

are we to understand that this is an intended behaviour of node-amqp?

ryanbmilbourne commented 8 years ago

I'm also seeing this issue and can replicate it consistently. Would a maintainer be willing to comment? It seems like the better behavior would be to either re-bind the channels or release them.