moscajs / aedes

Barebone MQTT broker that can run on any stream server, the node way
MIT License
1.79k stars 232 forks source link

MaxListenersExceededWarning: Possible EventEmitter memory leak detected. #148

Closed nadalizadeh closed 7 years ago

nadalizadeh commented 7 years ago

Hello,

I'm using aedes for an online game async communications and chat. The server works fine and after 1 month it suddenly stops working and messages do not get dispatched. I have the following error in the stderr:

(node:3096) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 201 drain listeners added. Use emitter.setMaxListeners() to increase limit

In the stdout log I see new clients connect successfully and messages are received from them but they don't dispatch to the channel for other users.

and in addition I have lots of logs like the following:

client error 685790 keep alive timeout Error: keep alive timeout
    at keepaliveTimeout (/home/mqttuser/node-mqtt-server/node_modules/aedes/lib/handlers/connect.js:44:28)
    at Timeout.timerWrapper [as _onTimeout] (/home/mqttuser/node-mqtt-server/node_modules/retimer/retimer.js:21:16)
    at ontimeout (timers.js:386:14)
    at tryOnTimeout (timers.js:250:5)
    at Timer.listOnTimeout (timers.js:214:5)

Here's my server code:

var writeFileAtomic = require('write-file-atomic')
var fs = require('fs')
var thrift = require('thrift')
var ttypes = require('./gen-nodejs/serialization_types')
var aedes = require('aedes')()
var server = require('net').createServer(aedes.handle)
var port = 1883

var tribeChatHistory = {}
const MAX_HISTORY_FOR_EACH_CHANNEL = 130
const HISTORY_SAVE_INTERVAL_SECONDS = 2 * 60
const HISTORY_PERSIST_FILE_NAME = process.env.HOME + '/.mqtt-server/history.json'

var FTransport = thrift.TFramedTransport
var BTransport = thrift.TBufferedTransport
var Protocol = thrift.TCompactProtocol

function saveData () {
  var theData = JSON.stringify(tribeChatHistory)
  writeFileAtomic(HISTORY_PERSIST_FILE_NAME, theData, {}, function (err) {
    if (err) throw err
    // console.log('saved history!')
  })
}

function loadData (callback) {
  console.log('Load started')
  fs.access(HISTORY_PERSIST_FILE_NAME, fs.constants.R_OK, (err) => {
    if (!err) {
      console.log('Loading history data')
      var fileReadResult = fs.readFileSync(HISTORY_PERSIST_FILE_NAME)
      var contents = fileReadResult.toString()
      tribeChatHistory = JSON.parse(contents, (key, value) => {
        return value && value.type === 'Buffer'
          ? Buffer.from(value.data) : value
      })
    } else {
      console.log('Error loading history file')
    }
    setInterval(saveData, HISTORY_SAVE_INTERVAL_SECONDS * 1000)
    callback()
  })
}

function getLastComponent (channelTopic) {
  var matches = channelTopic.match('.*/(.*)')
  if (matches == null)
    return null
  return matches[1]
}

function startServer () {
  server.listen(port, function () {
    console.log('server listening on port', port)
  })
}

aedes.on('clientError', function (client, err) {
  console.log('client error', client.id, err.message, err.stack)
})

aedes.on('publish', function (packet, client) {
  if (client) {
    console.log('message from client', client.id, 'on', packet.topic)
    // console.dir(packet)

    if (packet.topic.startsWith('unity/percity/')) {
      var chanName = getLastComponent(packet.topic)
      if (!(chanName in tribeChatHistory)) {
        tribeChatHistory[chanName] = []
      }

      var theQ = tribeChatHistory[chanName]
      theQ.push(packet.payload)

      while (theQ.length > MAX_HISTORY_FOR_EACH_CHANNEL) {
        theQ.shift()
      }
    }

  }
})

aedes.on('subscribe', function (subscriptions, client) {
  if (client) {
    if (subscriptions == null || subscriptions.length < 1)
      return
    var theTopic = subscriptions[0].topic
    console.log('subscribe request from client', client.id, 'on', theTopic)

    if (theTopic.startsWith('unity/percity/tribe') || theTopic.startsWith('unity/percity/harbor')) {
      var chanName = getLastComponent(theTopic)
      var historyArray = tribeChatHistory[chanName]

      if (historyArray === undefined || historyArray == null) {
        console.log('No history for channel ' + chanName)
        return
      }

      for (var i = 0; i < historyArray.length; i++) {
        var thePacket = {
          cmd: 'publish',
          qos: 0,
          topic: theTopic,
          payload: historyArray[i],
          retain: false
        }
        client.publish(thePacket)
      }
    }
  }
})

function deserializeChatMessage (packet) {
  var buft = new FTransport(packet.payload)
  var myprot = new Protocol(buft)
  var chatMsg = new ttypes.ChatMessageData()
  chatMsg.read(myprot)

  return chatMsg
}

function serializeChatMessage (chatMsg, packet) {
  var myBuf = new Buffer([])
  var buftra = new BTransport(myBuf, function (outBuf) {
    myBuf = Buffer.concat([myBuf, outBuf])
  })
  var myprot = new Protocol(buftra)
  chatMsg.write(myprot)
  myprot.flush()
  buftra.flush()
  packet.payload = myBuf
}

aedes.authorizePublish = function (client, packet, callback) {
  // if (packet.topic === 'aaaa') {
  //   return callback(new Error('wrong topic'))
  // }

  if (packet.topic.startsWith('unity/percity/tribe') || packet.topic.startsWith('unity/percity/tribe')) {
    try {
      var chatMsg = deserializeChatMessage(packet)
      var logMessage = packet.topic + ' -> User: ' + chatMsg.player.id + ' Name: ' + chatMsg.player.name + ' Message: ' + chatMsg.message
      console.log(logMessage)
      chatMsg.timestamp = Math.floor(Date.now() / 1000)
      serializeChatMessage(chatMsg, packet)
    } catch (err) {
      console.log(packet.topic + ' -> Error parsing thrift message', err.message)
    }
  }

  callback(null)
}

aedes.on('client', function (client) {
  console.log('new client', client.id)
})

// -------------------
loadData(function () {
  startServer()
})
mcollina commented 7 years ago

which version of Node.js are you using? Il giorno lun 14 ago 2017 alle 10:22 Ali N. notifications@github.com ha scritto:

Hello,

I'm using aedes for an online game async communications and chat. The server works fine and after 1 month it suddenly stops working and messages do not get dispatched. I have the following error in the stderr:

(node:3096) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 201 drain listeners added. Use emitter.setMaxListeners() to increase limit

In the stdout log I see new clients connect successfully and messages are received from them but they don't dispatch to the channel for other users.

and in addition I have lots of logs like the following:

client error 685790 keep alive timeout Error: keep alive timeout at keepaliveTimeout (/home/mqttuser/node-mqtt-server/node_modules/aedes/lib/handlers/connect.js:44:28) at Timeout.timerWrapper [as _onTimeout] (/home/mqttuser/node-mqtt-server/node_modules/retimer/retimer.js:21:16) at ontimeout (timers.js:386:14) at tryOnTimeout (timers.js:250:5) at Timer.listOnTimeout (timers.js:214:5)

Here's my server code:

var writeFileAtomic = require('write-file-atomic') var fs = require('fs') var thrift = require('thrift') var ttypes = require('./gen-nodejs/serialization_types') var aedes = require('aedes')() var server = require('net').createServer(aedes.handle) var port = 1883

var tribeChatHistory = {} const MAX_HISTORY_FOR_EACH_CHANNEL = 130 const HISTORY_SAVE_INTERVAL_SECONDS = 2 * 60 const HISTORY_PERSIST_FILE_NAME = process.env.HOME + '/.mqtt-server/history.json'

var FTransport = thrift.TFramedTransport var BTransport = thrift.TBufferedTransport var Protocol = thrift.TCompactProtocol

function saveData () { var theData = JSON.stringify(tribeChatHistory) writeFileAtomic(HISTORY_PERSIST_FILE_NAME, theData, {}, function (err) { if (err) throw err // console.log('saved history!') }) }

function loadData (callback) { console.log('Load started') fs.access(HISTORY_PERSIST_FILE_NAME, fs.constants.R_OK, (err) => { if (!err) { console.log('Loading history data') var fileReadResult = fs.readFileSync(HISTORY_PERSIST_FILE_NAME) var contents = fileReadResult.toString() tribeChatHistory = JSON.parse(contents, (key, value) => { return value && value.type === 'Buffer' ? Buffer.from(value.data) : value }) } else { console.log('Error loading history file') } setInterval(saveData, HISTORY_SAVE_INTERVAL_SECONDS * 1000) callback() }) }

function getLastComponent (channelTopic) { var matches = channelTopic.match('./(.)') if (matches == null) return null return matches[1] }

function startServer () { server.listen(port, function () { console.log('server listening on port', port) }) }

aedes.on('clientError', function (client, err) { console.log('client error', client.id, err.message, err.stack) })

aedes.on('publish', function (packet, client) { if (client) { console.log('message from client', client.id, 'on', packet.topic) // console.dir(packet)

if (packet.topic.startsWith('unity/percity/')) {
  var chanName = getLastComponent(packet.topic)
  if (!(chanName in tribeChatHistory)) {
    tribeChatHistory[chanName] = []
  }

  var theQ = tribeChatHistory[chanName]
  theQ.push(packet.payload)

  while (theQ.length > MAX_HISTORY_FOR_EACH_CHANNEL) {
    theQ.shift()
  }
}

} })

aedes.on('subscribe', function (subscriptions, client) { if (client) { if (subscriptions == null || subscriptions.length < 1) return var theTopic = subscriptions[0].topic console.log('subscribe request from client', client.id, 'on', theTopic)

if (theTopic.startsWith('unity/percity/tribe') || theTopic.startsWith('unity/percity/harbor')) {
  var chanName = getLastComponent(theTopic)
  var historyArray = tribeChatHistory[chanName]

  if (historyArray === undefined || historyArray == null) {
    console.log('No history for channel ' + chanName)
    return
  }

  for (var i = 0; i < historyArray.length; i++) {
    var thePacket = {
      cmd: 'publish',
      qos: 0,
      topic: theTopic,
      payload: historyArray[i],
      retain: false
    }
    client.publish(thePacket)
  }
}

} })

function deserializeChatMessage (packet) { var buft = new FTransport(packet.payload) var myprot = new Protocol(buft) var chatMsg = new ttypes.ChatMessageData() chatMsg.read(myprot)

return chatMsg }

function serializeChatMessage (chatMsg, packet) { var myBuf = new Buffer([]) var buftra = new BTransport(myBuf, function (outBuf) { myBuf = Buffer.concat([myBuf, outBuf]) }) var myprot = new Protocol(buftra) chatMsg.write(myprot) myprot.flush() buftra.flush() packet.payload = myBuf }

aedes.authorizePublish = function (client, packet, callback) { // if (packet.topic === 'aaaa') { // return callback(new Error('wrong topic')) // }

if (packet.topic.startsWith('unity/percity/tribe') || packet.topic.startsWith('unity/percity/tribe')) { try { var chatMsg = deserializeChatMessage(packet) var logMessage = packet.topic + ' -> User: ' + chatMsg.player.id + ' Name: ' + chatMsg.player.name + ' Message: ' + chatMsg.message console.log(logMessage) chatMsg.timestamp = Math.floor(Date.now() / 1000) serializeChatMessage(chatMsg, packet) } catch (err) { console.log(packet.topic + ' -> Error parsing thrift message', err.message) } }

callback(null) }

aedes.on('client', function (client) { console.log('new client', client.id) })

// ------------------- loadData(function () { startServer() })

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/mcollina/aedes/issues/148, or mute the thread https://github.com/notifications/unsubscribe-auth/AADL4ywHWEaiKT-EX7DEWIf2xaPlsN1iks5sYAPcgaJpZM4O2EjT .

nadalizadeh commented 7 years ago

v7.10.1

nadalizadeh commented 7 years ago

I just faced this ticket : #51 I've now changed my concurrency to 1000 to see if works or not. Would it resolve my issue?

mcollina commented 7 years ago

Can you reproduce this reliably? I think we have a bug on our side. That mitigates the issue, but no, it won't solve it. I recommend you to recycle your node processes every once in a while.

nadalizadeh commented 7 years ago

The fact that it happens on the production servers and under load, makes it hard to reproduce. But with the code I've sent above, I have experienced this behaviour two times (with concurrency = 100) and each of them after around 1 month of uptime.