postwait / node-amqp

[UNMAINTAINED] node-amqp is an AMQP client for nodejs
MIT License
1.69k stars 358 forks source link

Error: Maximum call stack size exceeded #429

Open AdamMagaluk opened 8 years ago

AdamMagaluk commented 8 years ago

While using an Node.js MQTT broker https://github.com/mcollina/mosca I and others have been running into an issue with amqp throwing a Maximum call stack size exceeded error originating in amqp parser.

Error usually looks like:

Server: RangeError: Maximum call stack size exceeded
    at min (native)
    at Buffer.subarray (native)
    at Buffer.slice (buffer.js:617:23)
    at allocate (buffer.js:88:23)
    at new Buffer (buffer.js:53:12)
    at parseFields (/root/mqtt-load-test/basic-broker/node_modules/amqp/lib/parser.js:315:17)
    at AMQPParser._parseMethodFrame (/root/mqtt-load-test/basic-broker/node_modules/amqp/lib/parser.js:359:14)
    at frameEnd (/root/mqtt-load-test/basic-broker/node_moduleas/amqp/lib/parser.js:93:16)
    at frame (/root/mqtt-load-test/basic-broker/node_modules/amqp/lib/parser.js:78:14)
    at header (/root/mqtt-load-test/basic-broker/node_modules/amqp/lib/parser.js:64:14)

Because lib/parser parses the incoming buffer recursively if you supply too large of a buffer that contains many amqp packets it eventually will reach the maximum stack size while parsing. In production we're seeing a large buffer with the size of 65536 get emitted from https://github.com/postwait/node-amqp/blob/master/lib/connection.js#L171 which then throws the error when execute() is called.

Not sure the best approach to solve the issue as amqp handles both parsing of the data and the underlying network connection. In a quick fix i modified the data event in lib/connection to spoon feed the incoming data in chunks no larger than 10000 bytes. Seems to fix our specific use case.

I'm running node version v4.2.1 and amqp version 0.2.4

Patch looks like: Using the spoon feed method

  var maxChunk = 10000;
  self.addListener('data', function (data) {
    if (self.parser === null) {
      return;
    }

    var pos=0;
    var writeSome = function writeSome() {
      var end = pos+maxChunk;
      if (end > data.length) {
        end = data.length;
      }

      try {
        self.parser.execute(data);
      } catch (exception) {
        self.emit('error', exception);
        return;
      }

      pos = end;
      if (pos < data.length) {
        setTimeout(writeSome, 0);
      } else {
        self._inboundHeartbeatTimerReset();
      }
    };
    writeSome();
  });

Here is a test file that replicates what's happening in production:

var EventEmitter = require('events').EventEmitter;
var Parser = require('amqp/lib/parser');
var Connection = require('amqp').Connection;

function create_connection() {
  var c = new Connection();
  EventEmitter.call(c);

  c.on('error', function(err) {
    console.error('Connection Error:', err);
    throw err;
  });

  c.write = function() {
    console.log('Write:', arguments);
  };

  c.emit('connect');
  return c;
}

var conn = create_connection();
var parser = new Parser('0-9-1');

var bufferMax = 5000;
var buffs = [];

conn.write = function(buf) {
  buffs.push(buf);
  if (buffs.length === bufferMax) {
    var buffer = Buffer.concat(buffs);

    parser.execute(buffer);

    buffs = [];
  }
};

while(1) {
  conn._sendBody('test', 'Some body');
}