moscajs / aedes-persistence-redis

Aedes persistence, backed by redis
MIT License
21 stars 23 forks source link

Using aedes-persistence-redis with Aedes resulting in slow client connections. #10

Open rasalasantosh opened 7 years ago

rasalasantosh commented 7 years ago

When aedes-persistence-redis used as persistence with Aedes , client connections are very slow . For 20k client connections it was taking around 10 minutes.But without persistence configured,connections are very fast around 50 k connection in less than a minute.

Please find the code below used to run aedes server.

var redis = require('mqemitter-redis'); var aedesPersistenceRedis = require('aedes-persistence-redis');

var mq = redis({ port: 6379, host: '172.31.38.96', db: 12 });

var persistence = aedesPersistenceRedis({ port: 6379,
host: '172.31.38.96'

});

var aedes = require('aedes')({ mq:mq, persistence:persistence })

var server = require('net').createServer(aedes.handle) var port = 1883

server.listen(port, function () { console.log('server listening on port', port) })

behrad commented 7 years ago

To test further, i migrated my current actual subscription data (under mosca), to aedes format under redis. When I launch aedes, memory grows until process halts. I noticed the _setup has some issues. However when I moved the same code inside _setup (to hgetallbuffer all topic keys, but without the process part) and ran it isolated in a single node.js file. It worked! it took 67606.250ms to hgetall all topics.

console.time('@@@@@@@@@@@@@@@@@@ READY');
  function split (keys, enc, cb) {
    for (var i = 0, l = keys.length; i < l; i++) {
      this.push(keys[i])
    }
    cb()
  }
  var splitStream = through.obj(split);
  var count = 0; var result = 0;
  var hgetallStream = throughv.obj(function getStream (chunk, enc, cb) {
    ioredisClient.hgetallBuffer(chunk, function(e,r){ result++; cb(); });
    count++;
  }, function emitReady (cb) {
    console.timeEnd('@@@@@@@@@@@@@@@@@@ READY');
    console.log('count ', count);
    cb()
  });
  console.time('smembers');
  ioredisClient.smembers('sub:client', function lrangeResult (err, results) {
    console.timeEnd('smembers');
    setInterval(()=>{console.log('Count='+count, ' Result='+result)},1000);
    if (err) {
      splitStream.emit('error', err)
    } else {
      splitStream.write(results);
      splitStream.end()
    }
  });

  pump(splitStream, hgetallStream, function pumpStream (err) {})

Surprisingly when I moved in the same code above (which is not processing keys) inside _setup it didn't work again! What can be introducing a memory leak here with the same code guys? @mcollina @GavinDmello

behrad commented 7 years ago

My heap dump shows it's inside SingleCaller -> thoughtv's ``fastparallel usage

mcollina commented 7 years ago

You don't need to use streams here. You will get some speed bump from removing them.

GavinDmello commented 7 years ago

@mcollina Could it be qlobber ? We're adding all the decoded packets to qlobber versus the topic.

mcollina commented 7 years ago

@GavinDmello not sure, but I don't think so.

behrad commented 7 years ago

So strange!!! I figured it out: NODE_ENV=development the difference between two runs was NODE_ENV! and when it's set on development, two things was happening:

  1. thoughtv was introducing a memory leak (as I mentioned inside it's fastparallel SingleCaller) changing it to through fixed that.
  2. A 10x speed reduction!!! I can't figure this out yet. Some module's development inspection (may be ioredis) should cause this.

Could it be qlobber ?

No, Qlobber part of code was commented.

behrad commented 7 years ago

You don't need to use streams here. You will get some speed bump from removing them.

by removing streams and putting hgetallbuffer inside a for loop, it gets faster, however it won't scale memory wise, we need a huge memory footprint on aedes startup and process freezes until persistence is ready.

mcollina commented 7 years ago

by removing streams and putting hgetallbuffer inside a for loop, it gets faster, however it won't scale memory wise, we need a huge memory footprint on aedes startup and process freezes until persistence is ready.

You can use something like http://npm.im/fastq.

behrad commented 7 years ago

My benchmarks ran just now:

through(stream)                70 secs
forloop:                     86 secs
fastq(concurrency=1)         126 secs
fastq(concurrency=100)       67 secs
fastq(concurrency=1000)     84 secs
fastq(concurrency=10000)   91 secs

So it seems changing to fastq won't help that much :)

Before I go and test with Qlobber code activated, there are some optimizitions I wanna mention:

  1. By loading from a topic index, we are hgetall-ing a clientId multiple times. Instead of a topic index, we'd better load by a client index (as mosca) and fetching less buffers. I'll try to optimize redis key schema for this.

  2. How much will it help, If we spawn multiple child processes, each hgetall-ing a subset of the set, then returning topic buffers back to the main setup process? (LRANGE or SSCAN can be used for this) My idea is to use multiple cores for loading large subscription data, will redis be saturated then @mcollina ?

  3. Aedes opens port before persistence is ready, persistence queues all callbacks until ready, and I don't like the user to be offline_msg_forwarded after 70 seconds, I prefer non ready instance has no port listening (like mosca) so that my load balancer relays user to a running instance. Can we change this to the method mosca behaves @mcollina ?

mcollina commented 7 years ago

on your results, can I see your code?

on 1. go ahead!

on 2. I'm really against going multi-process. This is very deployment specific.

on 3. Aedes does not listen automatically. That part of logic in Mosca was super complicated, so it's up to you. If you want to send an update to the README, go ahead. If we are missing some events for this, please send a PR and we'll add them.

behrad commented 7 years ago

on your results, can I see your code?

  1. stream code is same as the above snippet (obtained from current aedes' code)
  2. for loop, removes through and calls hgetallbuffer inside smembers callback.
  3. fastq is something like this:
    var queue = require('fastq')(worker, 100)
    queue.drain = function() {
    console.timeEnd('@@@@@@@@@@@@@@@@@@ READY')
    console.log('count ', count)
    }
    function worker (arg, cb) {
    that._db.hgetallBuffer(arg, function(e,r){ result++; cb(); })
    count++;
    }
    this._db.smembers(subscriptionsKey, function lrangeResult (err, results) {
    for (var i = 0, l = results.length; i < l; i++) {
      queue.push(results[i])
    }
    }

on 2. I'm really against going multi-process. This is very deployment specific.

So, I'll suspend this, and go for more priority tasks like number 1.

on 3. Aedes does not listen automatically. That part of logic in Mosca was super complicated, so it's up to you. If you want to send an update to the README, go ahead. If we are missing some events for this, please send a PR and we'll add them.

Great. I should check if persistence api already exposes ready .

mcollina commented 7 years ago

I think 1. might be the path that leads to the best benefits.

behrad commented 7 years ago

Yes, sure.

Can you clarify me also on these extra details @mcollina ?

  1. Why are you using thoughtv instead of though2 ? The same could've been achieved by through2

  2. Can we play with concurrency of thought.obj by setting a hightWaterMark?

  3. Why are using pump, how it helps?

mcollina commented 7 years ago

throughv parallelize, through2 does things in series.

you can play with throughv concurrency by setting a highWaterMark.

pump makes sure all streams are closed if one errors.

behrad commented 7 years ago

thank you man 👍

and have you got any clues why fastparallel's SingleCaller is eating memory when NODE_ENV=development ?

behrad commented 7 years ago

This issue should be resolved by #31

robertsLando commented 4 years ago

@mcollina @behrad can this be closed?