mcollina / mqemitter-mongodb

MongoDB based MQEmitter
MIT License
18 stars 10 forks source link

[bug] Memory leak, missing packets #24

Closed robertsLando closed 4 years ago

robertsLando commented 4 years ago

On high loads, the mqemitter miss to emit some packets.

The leak is here

I'm unable to find a fix to this.

Reference: https://github.com/moscajs/aedes/issues/533

mcollina commented 4 years ago

Have you got a script to reproduce this in isolation? A unit test would be even better.

robertsLando commented 4 years ago

This test ends correctly with count = 10000 and never ends when count is 100000 (100K)

test('leak test', function (t) {

      const total = 100000
      var count = 0

      var mqEmitterMongoDB = MongoEmitter({
        url: url
      })

      const topic = 'test'

      mqEmitterMongoDB.status.once('stream', function () {

        mqEmitterMongoDB.on(topic, function(msg, cb) {
          count++
          cb()
          if(count === total) {
            t.end()
            mqEmitterMongoDB.close()
          }
        })

        for (let index = 0; index < total; index++) {
          var payload = index
          mqEmitterMongoDB.emit({topic, payload})
        }
      })
    })
robertsLando commented 4 years ago

@mcollina By digging into this I think it might be not a problem with the emitter but with aedes, I'm speaking about https://github.com/moscajs/aedes/issues/533 issue, it might be that the packet is deduped by https://github.com/moscajs/aedes/blob/master/lib/client.js#L159 as the mqemitter mongo emit isn't syncronous and 2 packet emittet in the order A B could end with B A and the A in that case could be discarded by the dedupe function

mcollina commented 4 years ago

I do not know how to fix it, it probably requires some in-depth investigation

robertsLando commented 4 years ago

Actually #26 could fix mqemitter in single instances but the bug will still be present in clusters

mcollina commented 4 years ago

https://github.com/mcollina/mqemitter-mongodb/pull/26 should fix also for clusters.