mpneuried / rsmq-worker

Helper to simply implement a worker around RSMQ ( Redis Simple Message Queue )
MIT License
117 stars 24 forks source link

Memory leak warning when using a predefined instance of rsmq #15

Closed kpturner closed 8 years ago

kpturner commented 8 years ago

If I create an instance of rsmq and create 11 or more queues on that instance - if I then create 11 or more rsmq-workers and specify the rsmq instance in the options (so that it does not create its own) then it starts throwing warnings:

warning: possible EventEmitter memory leak detected. 11 disconnect listeners added. Use emitter.setMaxListeners() to increase limit.

This is because each time I create the new rsmq-worker it adds a disconnect listener to the original rsmq instance. As soon as you add 11 or more EventEmitter goes into meltdown :(

kpturner commented 8 years ago

A simple piece of code to illustrate:

var RSMQWorker = require( "rsmq-worker" );
var RedisSMQ = require("rsmq");

var rsmq = new RedisSMQ( {host: "127.0.0.1", port: 6379, ns: "rsmq"} );
var workers=[];

for (var i=0; i<12; i++) {
    var name="worker_"+i.toString();
    workers.push(new RSMQWorker(name,{
        rsmq:       rsmq,
        autostart:  true,
    }))
}
mpneuried commented 8 years ago

Hi, thanks for your hint and the example code, that perfekt ;-)

BUT ... i have to disagree. This is not really a bug and your solution will not work. On a disconnect, in your changes, only one of the 11 generated queues will run stop() and set the reconnectActive flag. So all other workers would try to poll the disconnected queue. So, in my opinion it's fine to bind the the connection events for every worker.

To get rid of the warning you can just set the threshold

var RSMQWorker = require( "./" );
var RedisSMQ = require("rsmq");
var WORKERCOUNT = 12;

var rsmq = new RedisSMQ( {host: "127.0.0.1", port: 6379, ns: "rsmq"} );
rsmq.setMaxListeners( WORKERCOUNT );
var workers=[];

for (var i=0; i<WORKERCOUNT; i++) {
    var name="worker_"+i.toString();
    workers.push(new RSMQWorker(name,{
        rsmq:       rsmq,
        autostart:  true,
    }))
}

IMHO: A a more complex solution, with a internal facade to every redis/rsmq server could be a solution, but this overhead, just to get rid of the standard the EventEmitter warning, will be too much.

What do you think?

kpturner commented 8 years ago

I think that is "nearly" OK but in my use case it will still cause a memory leak. In my case many workers can be created and deleted over a period of time, and I have no way of knowing how many workers there will be at the time that the rsmq instance is created. My simple example code was just to illustrate the issue, but is not illustrative of the real "use case".

One solution might be to setMaxListeners(<some huge number>) but then I still need the ability to remove the disconnect listener automatically when the associated worker is killed off. Perhaps the listener can be removed when quit() is run and when stop() is run, and added back if/when start() is run?

kpturner commented 8 years ago

.....and I use a single rsmq instance because it is created with a single ioredis client - to facilitate sentinels and db connection authentication (just in case you were wondering). :)

mpneuried commented 8 years ago

i optimized the event listener handling so the single rsmq instance will only have as much listeners as workers connected. So if you .stop() a worker the listener will be removed from the rsmq instance and re-added on a .start().

My small test script can be found here as gist

kpturner commented 8 years ago

OK great - will the listener also be removed on quit()?

kpturner commented 8 years ago

Is the fix on NPM?

kpturner commented 8 years ago

Yeah all good - thanks

mpneuried commented 8 years ago

YES! quit is equal to stop but disconnects hard from the queue. But i recommned .stop() in your case, because .quit() will detroy the connection of the rsmq instance.

See code and README.