moscajs / mosca

MQTT broker as a module
mosca.io
3.2k stars 513 forks source link

memory leaks? #743

Open LonelyWalker78 opened 6 years ago

LonelyWalker78 commented 6 years ago

Hi there, First off, thank you for all the contributors and maintainers of this great package.

Memory overflow occurs after the program has been running for a while

nohup.out

<--- Last few GCs --->

425904725 ms: Mark-sweep 1461.4 (1418.9) -> 1461.4 (1418.9) MB, 3207.2 / 0.1 ms [allocation failure] [GC in old space requested].
425908025 ms: Mark-sweep 1461.4 (1418.9) -> 1462.4 (1407.9) MB, 3299.4 / 0.1 ms [last resort gc].
425911324 ms: Mark-sweep 1462.4 (1407.9) -> 1463.5 (1407.9) MB, 3298.6 / 0.1 ms [last resort gc].

<--- JS stacktrace --->

==== JS stack trace =========================================

Security context: 0x3024b723fa99 <JS Object>
    1: new constructor(aka WritableState) [_stream_writable.js:~26] [pc=0x3a39f056ea2a] (this=0x2723e4e5e411 <a WritableState with map 0x2b70b067ec21>,options=0x1429e5ede621 <an Object with map 0x2b193265c259>,stream=0x2723e4e5e179 <a TLSSocket with map 0x2b193265c889>)
    3: Writable [_stream_writable.js:170] [pc=0x3a39f0691743] (this=0x2723e4e5e179 <a TLSSocket with map 0x2b193265c889>,optio...

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory
 1: node::Abort() [node]
 2: 0xdeec3c [node]
 3: v8::Utils::ReportApiFailure(char const*, char const*) [node]
 4: v8::internal::V8::FatalProcessOutOfMemory(char const*, bool) [node]
 5: v8::internal::Handle<v8::internal::JSFunction> v8::internal::Factory::New<v8::internal::JSFunction>(v8::internal::Handle<v8::internal::Map>, v8::internal::AllocationSpace) [node]
 6: v8::internal::Factory::NewFunction(v8::internal::Handle<v8::internal::Map>, v8::internal::Handle<v8::internal::SharedFunctionInfo>, v8::internal::Handle<v8::internal::Context>, v8::internal::PretenureFlag) [node]
 7: v8::internal::Factory::NewFunctionFromSharedFunctionInfo(v8::internal::Handle<v8::internal::Map>, v8::internal::Handle<v8::internal::SharedFunctionInfo>, v8::internal::Handle<v8::internal::Context>, v8::internal::PretenureFlag) [node]
 8: v8::internal::Factory::NewFunctionFromSharedFunctionInfo(v8::internal::Handle<v8::internal::SharedFunctionInfo>, v8::internal::Handle<v8::internal::Context>, v8::internal::PretenureFlag) [node]
 9: v8::internal::Runtime_NewClosure_Tenured(int, v8::internal::Object**, v8::internal::Isolate*) [node]
10: 0x3a39d8a060c7

mqtt.js

#!/usr/bin/env node
var mosca   = require('mosca');
var mysql   = require('mysql');
var Promise = require('promise');
var md5     = require('md5');
var util    = require('util');
var winston = require('winston');
require('winston-daily-rotate-file');
var logger = new(winston.Logger)({
    transports: [
        new (winston.transports.DailyRotateFile)({
            filename: util.format('%s/%s', '/opt/mqtt-broker/logs', 'mqtt-broker'),
            datePattern: '.yyyy-MM-dd.log',
            maxsize: 1024 * 1024 * 10 // 10MB
          })
    ]
});
var server  = new mosca.Server({
   secure:{
      port:8883,
      keyPath: 'abc-key.pem',
      certPath: 'abc-cert.pem',
   }
});
var pool    = mysql.createPool({
  connectionLimit : 10,
  host            : '192.168.1.41',
  user            : 'root',
  password        : 'root',
  database        : 'rsss'
});
var authenticate = function(client, username, password, callback) {
  var encryptedPass = md5(password);
  var promise = new Promise(function(resolve,reject){
    var sql = "SELECT password FROM users where user=?";
    sql = mysql.format(sql,[username]);
    pool.query(sql,function(error,results,fields){
        if(error){
            logger.error("fail to excute sql %s with error %s",sql,error);
            reject();return;
        }
        if(results.length>0 && encryptedPass===results[0].password){
            client.user = username;
            resolve();
        }
        else{
            logger.error("user doesn't exist ...");
            reject();
        }
    });
  });
  promise.then(function()
  {
      callback(null,true);
  }).catch(function(){
      logger.warn('fail to authenticate user %s with password %s',username,password);
      callback(null,false);
  });
};
server.on('ready', function(){
  server.authenticate = authenticate;
});
server.on('published', function(packet, client){
//    logger.info('publish topic %s:message %s',packet.topic,packet.payload);
});
server.on('clientConnected', function(client) {
    logger.info('client connected %s', client.id);
});
server.on('clientDisconnected', function(client) {
    logger.info('client disconnected:', client.id);
});
remya-jose commented 6 years ago

Hi, Did you get any resolution for the memory issue? We were also facing memory issue when we use server.publish function.

LonelyWalker78 commented 5 years ago

@remya-jose I haven't solved it. But this memory issue may be due to publishing too much message. After I cut down the frequency of publishing messages, I never had this problem again.

remya-jose commented 5 years ago

Thank you LonelyWalker78. We too slightly modified our architecture and avoided sever.publish. Now the memory issue is fixed.

srinivasnamani2010 commented 5 years ago

I have the same problem, consuming more memory. Could you help me for resolving the issue?

buffautomation4Gad commented 5 years ago

@srinivasnamani2010 In our case, the server.publish method caused memory issue. Hence, we had modified the architecture to use client.publish and client.subscribe and removed server.publish completely.

srinivasnamani2010 commented 5 years ago

@buffautomation4Gad

We are using java mqtt client to publish the message to mosca server.

Is the change related to the client application or mosca server.

Can you please suggest the exact change is where? We will update as per it

remya-jose commented 5 years ago

@srinivasnamani2010 We had used server.publish method inside our Mosca server to forward the incoming requests. This created memory issue once the load reached 1000s of concurrent messages. We didn't solve the issue, but we re-architect the application; in stead of forwarding the messages using server.publish, we used wildcard topic subscription at the client side.

I don't know about your architecture/application, so I won't be able to suggest changes.