postwait / node-amqp

[UNMAINTAINED] node-amqp is an AMQP client for nodejs
MIT License
1.69k stars 357 forks source link

Channels keep increasing for every exchange.publish() #383

Open hayzem opened 9 years ago

hayzem commented 9 years ago

I read the channels issues but couldn't figure out why it keep increasing. Here is my sample code:

connection.exchange("brcks-wfa",{type:'direct',durable:true}, function(exchange) {
setInterval(function() { ... awS.forEach(function(wc){ ... nstbs.forEach(function(br){ ... BUpdate(brnewinfo,function(st){ if(st){ exchange.publish(route, brnewinfo,{contentType:"application/json"}); } }); }); ... }); }, 4000); });

gileze33 commented 9 years ago

I've been trying to nail down an issue with my rabbitmq instances having constantly growing memory to the point where it hits the watermark in a few days, under consistent loads.

I've since notices that every single time I publish using node-amqp, I see more channels being opened, but those channels are never closed - seems like we've both hit the same problem. I'm going to try over the weekend to do some proper testing and see if I can come up with a fix in a PR.

dabates commented 9 years ago

I am also seeing this problem.. is there another AMQP library that should be used instead of this one?

AVVS commented 9 years ago

try amqp-coffee from dropbox

dabates commented 9 years ago

Thanks @AVVS I'm also looking at bramqp and @squaremo 's amqp libraries

AliUz commented 9 years ago

@hayzem, @dabates, the same issue is already open here: #247. It seems you have to open the exchange connection once, and reuse it for any subsequent connection, otherwise you are opening a new channel for each request.

hayzem commented 9 years ago

I switched to wascally. It's pretty good. Especially what I like about it is that you can configure exchanges,queues, etc. in one file. No need to reconfigure every time you need rabbitmq. Should I close the issue?

mbejda commented 9 years ago

What a shame. Yea, i switched to Wascally also.

mikeatlas commented 9 years ago

I've attempted to keep a singleton connection to rabbit open, and within a tight callback burst (50k calls / second), this blows memory anyways:

.on("data", function(data){         
    rabbitMQExchange.publish('', data, {contentType: 'application/json'});    
})

I don't understand how else to "reuse channels" when I've got only one instance of the exchange open and am simply calling .publish() heavily. Time to try another library I guess :(

mikeatlas commented 9 years ago

For what it's worth, I used memwatch-next and did a heap dump before/after publishing ~50k messages. These are the results:

"before": {
    "nodes": 141521,
    "size": "29.61 mb",
    "size_bytes": 31045056
},
"after": {
    "nodes": 804911,
    "size": "86.8 mb",
    "size_bytes": 91011512
}

A nice whopping 50mb - nearly 1mb per 1k messages. I should note that my messages are fairly short JSON objects themselves. Looking at the details of the heap dump, the worst heap growth offenders (>1mb) are:

"details": [
  {
      "+": 213898,
      "-": 7889,
      "size": "7.4 mb",
      "size_bytes": 7762888,
      "what": "Array"
  },
  {
      "+": 154195,
      "-": 7,
      "size": "7.06 mb",
      "size_bytes": 7401024,
      "what": "Buffer"
  },
  {
      "+": 154231,
      "-": 65,
      "size": "10.59 mb",
      "size_bytes": 11099952,
      "what": "Closure"
  },
  {
      "+": 3509,
      "-": 0,
      "size": "27.48 mb",
      "size_bytes": 28811264,
      "what": "Native"
  },
  {
      "+": 154191,
      "-": 1,
      "size": "7.06 mb",
      "size_bytes": 7401056,
      "what": "WriteReq"
  },
]

I've even tried narrowing down and simplifying the publish code myself, with a forced garbage collection at the end of every publish, no callbacks, no tasks, and still no difference in heap growth:

  publishToExchange: function (routingKey, data, theOptions, callback) {
    var exchange = module.exports.rabbitMQExchange;
    var options = _.extend({}, theOptions || {});

    options.routingKey = routingKey;
    options.exchange   = exchange.name;
    options.mandatory  = options.mandatory ? true : false;
    options.immediate  = options.immediate ? true : false;
    options.reserved1  = 0;

    var methods = require('./borrowed/amqp-definitions').methods;

    exchange.connection._sendMethod(exchange.channel, methods.basicPublish, options);
    exchange.connection._sendBody(exchange.channel, data, options);
    new memwatch.gc(); // force a GC!
  },
mikeatlas commented 9 years ago

So - at least on my end I can absolve node-amqp partially. Our rabbit server was on its knees, and barely responding (due to other queuing issues). What seems to be the case is that my publish closures (native code + small bits of data) were potentially quickly accumulating on the stack. Hard to replicate now with our flailing server bounced. Regardless, my calls to publish were somehow either failing silently or lacking mechanism to eventually fail and be cleaned up by the GC. I'll bow out of this thread for now; in the process we switched to amqplib/amqp.node. As you can see above, I referenced other libraries tested such as amqp-coffee, both of those threw SSL/TLS errors trying to connect to our rabbit server (which has a self-signed cert; node-amqp does not complain about connecting with a self-signed cert while those other two libraries do). So that's an unrelated issue but another to pop off to an alternative library.

serhatates commented 7 years ago

+1