postwait / node-amqp

[UNMAINTAINED] node-amqp is an AMQP client for nodejs
MIT License
1.7k stars 357 forks source link

warning: possible EventEmitter memory leak detected. 11 error listeners added. Use emitter.setMaxListeners() to increase limit. #451

Open mgoldsborough opened 7 years ago

mgoldsborough commented 7 years ago

Hello,

I have code which rapidly publishes data to an exchange. After a number of publishes, I'm getting an error indicating a memory leak inside the amqp library, (node) warning: possible EventEmitter memory leak detected. 11 error listeners added. Use emitter.setMaxListeners() to increase limit..

I found similar bugs on the issue tracker, but most were old (~2013) and none captured this exact scenario.

Here is example code which reproduces the bug:

const amqp = require('amqp');

let connectionOptions = {
    url: 'amqp://guest:guest@127.0.0.1:5672'
};

let clientProperties = {
    reconnect: true
};

let connection = amqp.createConnection(connectionOptions, clientProperties);

function leakyPublish() {
    connection.exchange('', {confirm: true}, (exchange) => {
        exchange.publish('data_queue', {abc: 123}, {}, (err) => {
            if(err) {
                console.error('Unable to publish message to queue', queueName, err.message);
            } else {
                console.log('Published!');
            }
        });
    });
}

connection.on('error', (err) => {
    console.error('Publisher error connecting to AMQP broker (%s)', connectionOptions.url, err.message);
});

connection.on('close', () => {
    console.error('Publisher connection to AMQP broker (%s) closed', connectionOptions.url);
});

connection.on('ready', () => {
    console.info('Publisher connection made to AMQP', connectionOptions.url);
    setInterval(() => {
        leakyPublish()
    }, 500);
});

The resulting output:

Published!
Published!
Published!
Published!
Published!
Published!
Published!
Published!
(node) warning: possible EventEmitter memory leak detected. 11 error listeners added. Use emitter.setMaxListeners() to increase limit.
Trace
    at Connection.addListener (events.js:252:17)
    at Exchange._onMethod (/Users/mgolds02/Code/fo/fo-amqp/node_modules/amqp/lib/exchange.js:57:25)
    at Exchange.Channel._onChannelMethod (/Users/mgolds02/Code/fo/fo-amqp/node_modules/amqp/lib/channel.js:85:12)
    at Connection._onMethod (/Users/mgolds02/Code/fo/fo-amqp/node_modules/amqp/lib/connection.js:454:28)
    at AMQPParser.self.parser.onMethod (/Users/mgolds02/Code/fo/fo-amqp/node_modules/amqp/lib/connection.js:136:12)
    at AMQPParser._parseMethodFrame (/Users/mgolds02/Code/fo/fo-amqp/node_modules/amqp/lib/parser.js:377:10)
    at frameEnd (/Users/mgolds02/Code/fo/fo-amqp/node_modules/amqp/lib/parser.js:94:16)
    at frame (/Users/mgolds02/Code/fo/fo-amqp/node_modules/amqp/lib/parser.js:79:14)
    at AMQPParser.header [as parse] (/Users/mgolds02/Code/fo/fo-amqp/node_modules/amqp/lib/parser.js:65:14)
    at AMQPParser.execute (/Users/mgolds02/Code/fo/fo-amqp/node_modules/amqp/lib/parser.js:138:21)
Published!
Published!
Published!

node version:

$ node -v
v5.10.1

node-amqp version:

$ cat node_modules/amqp/package.json |grep "version"
  "version": "0.2.6"

Any help would be much appreciated.

postwait commented 7 years ago

Don't create the exchange each time you publish.

mgoldsborough commented 7 years ago

@postwait thanks for the reply.

As you suggested, I modified the code so it does not create the exchange each time. And this resolved the initial memory leak warning from node. I kept track of the listeners on the connection object and, when RMQ is running constantly, the listener count remains the same.

However, if the RMQ server is bounced, it looks like the listener count begins to creep upwards, albeit at a slower rate.

The code I ran and resulting output is below.

const EventEmitter = require('events'),
      amqp = require('amqp');

let e = new EventEmitter();

let connectionOptions = {
    url: 'amqp://guest:guest@127.0.0.1:5672'
};

let options = {
    reconnect: true,
    reconnectBackoffStrategy: 'linear',
    reconnectExponentialLimit: 2000,
    reconnectBackoffTime: 1000
};

let publishTimer = undefined;

let connection = amqp.createConnection(connectionOptions, options);

connection.on('error', (err) => {
    console.error('Error connecting to AMQP broker (%s)', connectionOptions.url, err.message);
});

connection.on('close', () => {
    console.error('Connection to AMQP broker (%s) closed', connectionOptions.url);

    // Stop publishing
    if(publishTimer) clearInterval(publishTimer);

    // Disconnect the connection. I found that if I didn't do this,
    // the connection would not automatically reconect.
    connection.disconnect();
});

connection.on('connect', () => {
    console.info('Connection made', connectionOptions.url);
});

connection.on('ready', () => {
    console.info('Publisher connection made to AMQP', connectionOptions.url);

    e.emit('connected');
});

// Open the exechange
e.on('connected', () => {
    console.dir('got connected');

    connection.exchange('', {confirm: true}, (exchange) => {
        e.emit('opened', exchange);
    });
});

// Once the exchange is opened, start interval to publish continously.
e.on('opened', (exchange) => {
    publishTimer = setInterval(() => {
        console.dir('Number of error listeners: ' + connection.listeners('error').length);

        exchange.publish('telemetry', {abc: 123}, {}, (err) => {
            if(err) {
                console.error('Unable to publish message to queue', 'telemetry', err.message);
                console.dir(err);
            } else {
                console.log('Published!');
            }
        });
    }, 2000);
});

// Interval to keep process running
setInterval(() => {
    console.log('Heartbeat');
}, 60000);

And here's the output. The comments (indicated by -->) in the file I added to help explain what was going on.

Connection made amqp://guest:guest@127.0.0.1:5672
Publisher connection made to AMQP amqp://guest:guest@127.0.0.1:5672
'got connected'
'got opened'
'Number of error listeners: 3'
Published!
'Number of error listeners: 3'
Published!
--> THE PREVIOUS 2 LINES REPEAT UNTIL RMQ IS STOPPED. NOTE, THE LISTENER COUNT STAYS CONSTANT
--> RMQ SERVER STOPPED
Publisher connection to AMQP broker (amqp://guest:guest@127.0.0.1:5672) closed
Publisher error connecting to AMQP broker (amqp://guest:guest@127.0.0.1:5672) This socket has been ended by the other party
Publisher error connecting to AMQP broker (amqp://guest:guest@127.0.0.1:5672) connect ECONNREFUSED 127.0.0.1:5672
Publisher connection to AMQP broker (amqp://guest:guest@127.0.0.1:5672) closed
Publisher error connecting to AMQP broker (amqp://guest:guest@127.0.0.1:5672) write after end
Publisher error connecting to AMQP broker (amqp://guest:guest@127.0.0.1:5672) connect ECONNREFUSED 127.0.0.1:5672
--> THE PREVIOUS 3 LINES ARE REPEATED UNTIL RMQ IS RESTARTED
--> RMQ SERVER RE-STARTED
Publisher connection made to AMQP amqp://guest:guest@127.0.0.1:5672
'got connected'
'got opened'
'Number of error listeners: 4'
Published!
'Number of error listeners: 4'
Published!
--> THE PREVIOUS 2 LINES REPEAT UNTIL RMQ IS STOPPED
--> NOTE THE INCREASE IN LISTENER COUNT FROM 3 TO 4
--> RMQ SERVER STOPPED
Publisher connection to AMQP broker (amqp://guest:guest@127.0.0.1:5672) closed
Publisher error connecting to AMQP broker (amqp://guest:guest@127.0.0.1:5672) This socket has been ended by the other party
Publisher error connecting to AMQP broker (amqp://guest:guest@127.0.0.1:5672) connect ECONNREFUSED 127.0.0.1:5672
Publisher connection to AMQP broker (amqp://guest:guest@127.0.0.1:5672) closed
Publisher error connecting to AMQP broker (amqp://guest:guest@127.0.0.1:5672) write after end
Publisher error connecting to AMQP broker (amqp://guest:guest@127.0.0.1:5672) connect ECONNREFUSED 127.0.0.1:5672
--> THE PREVIOUS 3 LINES ARE REPEATED UNTIL RMQ IS RESTARTED
--> RMQ SERVER RE-STARTED
Publisher connection made to AMQP amqp://guest:guest@127.0.0.1:5672
'got connected'
'got opened'
'Number of error listeners: 5'
Published!
'Number of error listeners: 5'
Published!

I ran the test a lot longer and the listener count kept increasing until I ultimately got the same node.js warning, (node) warning: possible EventEmitter memory leak detected. 11 error listeners added. Use emitter.setMaxListeners() to increase limit..

Am I improperly handling connections/exchanges or is there a lower level bug in the reconnection logic as it pertains to handling event listeners?

Thanks.

Pushplaybang commented 6 years ago

having a similar issue.

sagar-artoon commented 4 years ago

I'm also facing the same issue.