amqp / rhea

A reactive messaging library based on the AMQP protocol
Apache License 2.0
280 stars 79 forks source link

Container still receiving events after closing connection and opening new connection #327

Open ghettosamson opened 3 years ago

ghettosamson commented 3 years ago

Greetings! Version: 1.0.24 Use Case: I open a connection to my broker with the following code,

function openNewConnection(connectionOptions, brokerConfig) {
  logger.info(`Opening connection to broker ${connectionOptions.host}:${connectionOptions.port}`);
  connection = rhea.connect(connectionOptions);
  logger.silly('Opening a new connection', connection);

  logger.info(`Opening AMQP receiver on ${brokerConfig.receiverAddress}`);
  receiver = connection.open_receiver(brokerConfig.receiverAddress);
  logger.silly('Opening a new receiver', receiver);
}

I set up event listeners for most of the events, including connection_open, sendable, disconnected, receiver_open, sender_open, receiver_close, sender_close, receiver_error, sender_error, connection_error, protocol_error, session_error, error, message and accepted. I need the ability for the application to easily close a connection, and establish a new connection to a new host & port. This is because I am using AWS service discovery, and any update I make to the AMQP broker create a new host & port so I have to close the old connection and open a new connection. The issue is I close all receivers, senders and then the connection and I try to open a new connection but the old event listeners are still catching events. I need a way to delete the container and create a new container with the same event listeners. Here is the code I use

rhea.on('disconnected', async function(context) {
    if (context.error) {
      logger.debug('Connection to the AMQP broker was disconnected', context.error);
    } else {
      logger.info('Disconnected from AMQP broker');
    }
    /*
      The rhea library will reconnect automatically, but there is a reconnection limit
     */
    if (!context.reconnecting) {
      logger.error('amqp connector no longer automatically reconnecting');
      shutDown();
      process.exit(1);
    }

    logger.silly('Check if the connection options have changed');
    try {
      const updatedConnectionOptions = await getConnectionOptions(brokerConfig, brokerCredentials);
      if (updatedConnectionOptions.port !== connectionOptions.port ||
      updatedConnectionOptions.host !== connectionOptions.host) {
        logger.info(`Broker port has been updated from ${connectionOptions.port} to ${updatedConnectionOptions.port} and host has been updated from ${connectionOptions.host} to ${updatedConnectionOptions.host}`);
        if (context.sender) {
          context.sender.close();
        }
        if (context.receiver) {
          context.receiver.close();
        }
        if (context.connection) {
          context.connection.close();
        }
        connectionOptions = updatedConnectionOptions;
        openNewConnection(connectionOptions, brokerConfig);
      }
    } catch (error) {
      logger.error('Error reconnecting the broker', error);
    }

    if (!timer) {
      timer = setTimeout(timerCallback, timeoutMillis);
      logger.info('Timer started to ensure broker is reconnected');
    }
    connected = false;
    writeConnectionStatus('disconnected');
  });

Do I need to add code the removes the old listeners and also add code to add new listeners? If so, what is the best way to do that?

grs commented 3 years ago

You can set the listeners on the connection rather than the default container.

ghettosamson commented 3 years ago

I tried that but I'm still receiving disconnected events.

let connection = rhea.connect(connectionOptions);

// To close the connection
if (receiver) {
    receiver.detach();
    receiver.close();
  }
  if (sender) {
    sender.detach();
    sender.close();
  }
  if (connection && connection.is_open()) {
    logger.silly('Closing open connection');
    connection.close();
  }

Is there a way to get the connection to close or delete it? I want it to stop attempting to reconnect. I keep getting the following errors

2020-11-17T12:33:33.442Z myamqp:debug Connection to the AMQP broker was disconnected Error: getaddrinfo ENOTFOUND bb4043a786ed01cf522e25.broker

    at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:66:26) {

  errno: 'ENOTFOUND',

  code: 'ENOTFOUND',

  syscall: 'getaddrinfo',

  hostname: 'bb4043a786ed01cf522e25.broker'

}

It would be nice to not have to set reconnect to false when I first open the connection, but if there is no other way I may have to do that.

grs commented 3 years ago

You should not get disconnected events for the connection after you have closed it.

ghettosamson commented 3 years ago

I need a way to get the disconnected events to stop. Here is what I did:

Connection 2

2020-11-20T19:14:16.671Z myModule:info Connection to AMQP broker successfully opened {
  connection: Connection {
    options: {
      host: '186dc8a1254949dba221652a3870a904.myHost',
      port: 39613,
      transport: 'tls',
      rejectUnauthorized: true,
      key: '-----BEGIN RSA PRIVATE KEY-----\n' +

And then these logs continue to appear for Connection 1

[connection-1] disconnected Error: getaddrinfo ENOTFOUND 34aa4ac5f395415682415e7b46c1b6d7.myHost
    at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:66:26) {
  errno: 'ENOTFOUND',
  code: 'ENOTFOUND',
  syscall: 'getaddrinfo',
  hostname: '34aa4ac5f395415682415e7b46c1b6d7.myHost’
}
grs commented 3 years ago

If you close the connection it will not attempt to reconnect.

ghettosamson commented 3 years ago

So I ensured that I call connection.close() and removed all listeners before I opened a new connection. I still see in the logs where it is still attempting to reconnect.

2020-11-23T12:57:34.084Z amqp-module:error AMQP connection received an error [ConnectionError] {
  message: undefined,
  condition: 'amqp:connection:forced',
  description: undefined
}
2020-11-23T12:57:34.086Z amqp-module:info Status disconnected written to file
2020-11-23T12:57:34.092Z amqp-module:info Disconnected from AMQP broker
2020-11-23T12:57:34.093Z amqp-module:silly New timer created
2020-11-23T12:57:34.093Z amqp-module:info Status disconnected written to file
2020-11-23T12:57:34.093Z amqp-module:silly Check if the connection options have changed
2020-11-23T12:57:34.098Z amqp-module:silly Record found, 4e1179b7fd044099aaf2e6038530e3f3.myhost:39616
2020-11-23T12:57:34.098Z amqp-module:info Broker port has been updated from 39613 to 39616 and host has been updated from 186dc8a1254949dba221652a3870a904.myhost to 4e1179b7fd044099aaf2e6038530e3f3.myhost
2020-11-23T12:57:34.099Z amqp-module:silly Removed listeners for sendable
2020-11-23T12:57:34.099Z amqp-module:silly Removed listeners for connection_open
2020-11-23T12:57:34.100Z amqp-module:silly Removed listeners for receiver_open
2020-11-23T12:57:34.101Z amqp-module:silly Removed listeners for sender_open
2020-11-23T12:57:34.101Z amqp-module:silly Removed listeners for receiver_close
2020-11-23T12:57:34.102Z amqp-module:silly Removed listeners for sender_close
2020-11-23T12:57:34.102Z amqp-module:silly Removed listeners for receiver_error
2020-11-23T12:57:34.102Z amqp-module:silly Removed listeners for sender_error
2020-11-23T12:57:34.103Z amqp-module:silly Removed listeners for connection_error
2020-11-23T12:57:34.104Z amqp-module:silly Removed listeners for protocol_error
2020-11-23T12:57:34.104Z amqp-module:silly Removed listeners for session_error
2020-11-23T12:57:34.104Z amqp-module:silly Removed listeners for error
2020-11-23T12:57:34.105Z amqp-module:silly Removed listeners for disconnected
2020-11-23T12:57:34.105Z amqp-module:silly Removed listeners for message
2020-11-23T12:57:34.106Z amqp-module:silly Removed listeners for accepted
2020-11-23T12:57:34.106Z amqp-module:info Disconnecting from the broker and closing receiver
2020-11-23T12:57:34.107Z amqp-module:silly Detaching and closing receiver
2020-11-23T12:57:34.108Z amqp-module:silly Detaching and closing sender
2020-11-23T12:57:34.108Z amqp-module:silly Closing open connection
2020-11-23T12:57:34.108Z amqp-module:info Broker connection closed from shutdown
2020-11-23T12:57:34.109Z amqp-module:info Opening connection to broker 4e1179b7fd044099aaf2e6038530e3f3.myhost:39616

Here is the code for the shutdown

const shutDown = function() {
  logger.info('Disconnecting from the broker and closing receiver');
  if (receiver) {
    logger.silly('Detaching and closing receiver');
    receiver.detach();
    receiver.close();
  }
  if (sender) {
    logger.silly('Detaching and closing sender');
    sender.detach();
    sender.close();
  }
  if (connection) {
    logger.silly('Closing open connection');
    connection.close();
  }
  logger.info('Broker connection closed from shutdown');
};

So I call connection.close but here is what I still see.

2020-11-23T12:57:35.111Z amqp-module:info New AMQP receiver opened
[connection-1] disconnected Error: getaddrinfo ENOTFOUND 186dc8a1254949dba221652a3870a904.myhost
    at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:66:26) {
  errno: 'ENOTFOUND',
  code: 'ENOTFOUND',
  syscall: 'getaddrinfo',
  hostname: '186dc8a1254949dba221652a3870a904.myhost'
}
[connection-1] disconnected Error: getaddrinfo ENOTFOUND 186dc8a1254949dba221652a3870a904.myhost
    at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:66:26) {
  errno: 'ENOTFOUND',
  code: 'ENOTFOUND',
  syscall: 'getaddrinfo',
  hostname: '186dc8a1254949dba221652a3870a904.myhost'
}

Those messages continue to show in the logs.