SocketCluster / socketcluster

Highly scalable realtime pub/sub and RPC framework
https://socketcluster.io
MIT License
6.14k stars 313 forks source link

Optimize for large number of subscribers #334

Open jondubois opened 6 years ago

jondubois commented 6 years ago

This applies to a scenario when there are a large number of subscribers (sometimes even with a small number of published messages).

When a large number of clients are subscribed to a channel, each time a message goes out, the channel can struggle to deliver the outbound messages to each subscriber and this may cause the sockets to time out and become disconnected.

Maybe we can come up with a strategy so that messages are spread out more evenly so that they don't clog up the Node.js event loop (i.e. don't try to deliver all of them in a single tick).

toredash commented 6 years ago

Hi,

We have done tests internally to see how frequent we can publish messages, at what size, with X amount of different subscribers, before the system will break.

It is really hard to give any good numbers here, as there is multiple factors that can cause variation in how quick we can dispatch messages to end-users. Latency on client, variable network throughput per VM in the cloud, noise neighbour etc.

One thing I know can "break" SCC, is if a large message is sent to a channel with large number of subscribers. Multiple things can happen then while SCC tries to send out the event to subscribers:

It would be nice if SCC, both on server on client side, would be able to detect and handle increased load in a dynamic way. For instance we dynamically add some latency to queue the process that sends messages to a channel in SCC. So that each message inserted is delayed 50ms before it is sent in SCC, preventing that the process suddenly pushes 10.000 messages at once to SCC.

I don't know if anything like this could be implemented in SCC to ensure clients doesn't disconnect. Maybe one could use CPU load or load avg as well ? If SCC had two types of channels, e.g. real-time and near-realtime, that could maybe help?

Posting random ideas and feedback, hope some of it comes to use.

jondubois commented 6 years ago

@toredash Thanks for the feedback!

Based on the earlier feedback I was given, this issue appears to be related to outbound messages. So if you have 20K subscribers and you publish one message to the channel, then effectively 20K copies of the message will need to be sent out at the same time - That's a lot of messages in a very short time. this can overwhelm the Node.js event loop.

One naive solution I can think of would be to add a delay in MIDDLEWARE_PUBLISH_OUT for each message before calling next() but I think it's worthwhile to investigate alternative approaches too - Maybe there is something that can be changed in SC internally to better support this use case.

jondubois commented 6 years ago

Notes:

toredash commented 6 years ago

@jondubois A lot would be solved for our part if we were certain that clients wouldn't disconnect when the load on SCC was high. A delay of 1-5 sec is fine, but we would also from an operation perspective like to know if SCC is working and just strained for resources, or if it isn't processing anything.

Thats maybe the different requirement we have from others: Real-time is nice to have, but "near-realtime" is perfectly fine.

We have chosen now to fetch all previous messages sent to a channel if a client reconnects, which we know causes a lot of traffic to our caching instances. We have done this to be sure the client have all messages in the event of a reconnect.

I haven't seen/found anything in SCC that works like this: Client connects to channel Message 1 to 10 is sent to channel, client receives all 10 Client reconnects Message 11 to 12 is sent Client reconnects, informs SCC that message # 10 was the last it received. SCC pushes message 11 and 12 to Client

I don't know how much compute this would require from SCC, but that would at least solve some of our scaling issues if we have a sane way of knowing that clients would get the last X messages sent to a channel.

A way of offloading large events to a HTTP CDN Cache is also a way to offload those huge events.

jondubois commented 6 years ago

@toredash It's nice to have some sort of monitoring for SC to track the CPU usage % - If it gets to around 80% on some CPU cores then you should start to expect disruptions to the service during spikes and it might be time to scale up or scale out.

My current thinking is to implement some sort of rate limiting/shaping for SC (either directly in SC as an option or as a plugin) which will help deal with large channels (with lots of subscribers) that are susceptible to large sudden spikes in message throughput... This could involve spreading out the messages over a longer period of time instead of trying to send them all at once (which sounds similar to what you're suggesting).

Regarding your approach of re-fetching the log (snapshotting); that's a very common approach not just with SC but all kinds of projects and frameworks. The optimisation that you're suggesting (to only re-fetch new messages instead of the whole log segment) makes sense for chat as well. You can do this with SC but you'd have to implement that behaviour yourself on top of it... Maybe as a plugin/addon for SC.

I've built a chat server on top of SC for a company which uses a similar approach (I pass in the date of the last received message and only get the log on or after that date - There can be some overlap so each message has a UUID).

SC is not just used for chat servers though so it's outside the scope of SC itself (would make a great plugin/addon though),

happilymarrieddad commented 6 years ago

@jondubois what if each worker had a "publish queue" and publishing out would just push to this queue. Then there would be some sort of handler that would pop say 100 items from the queue every 5ms or something like that? Would that work? It would keep the messages in order so there shouldn't be any out of order problems. It would also stagnate the messages so the event loop isn't overwhelmed. Is that a viable solution? I've noticed this problem on our servers as we add more clients. Thanks!

jondubois commented 6 years ago

@happilymarrieddad Yes something like that sounds good. Note that developers could implement something similar right now using MIDDLEWARE_PUBLISH_OUT but it's not that straight forward. Maybe it should be an option passed to SC to use a publish queue like that.

happilymarrieddad commented 6 years ago

@jondubois I know this is kind of dirty and could be handled much better but I was thinking of something like this. What do you think? It seems to work.

selection_069 selection_070

Worker.js

let publishQueueTimeout = 30 * 1000
  let publishQueue = []

  let publishOutHandler = function() {
    console.log('Starting publish out handler',new Date())
    if (publishQueue.length) {
      let packet = publishQueue.shift()
      packet.next(null,packet.data)
    }

    setTimeout(() => publishOutHandler(),publishQueueTimeout)
  }

  publishOutHandler()

  scServer.addMiddleware(scServer.MIDDLEWARE_PUBLISH_OUT,(req,next) => {
    let channel = req.channel
    let socket = req.socket
    let data = req.data

    console.log('WORKER >> SC Server Middleware Publish Out. Storing data to the publish queue',new Date())

    publishQueue.push({
      data:data,
      next:next
    })
  })

  worker.exchange.subscribe('send-data').watch(data => {
    worker.exchange.publish('receive-data',data)
  })

Client

// Initiate the connection to the server
var socket = socketCluster.connect()

socket.on('error', function (err) {
    throw 'Socket error - ' + err
})

socket.on('connect', function () {
    console.log('CONNECTED')

    socket.subscribe('receive-data').watch(data => {
        console.log('receive-data',new Date())
        console.log(data)
    })

    let packet = {
        id:1
    }
    console.log('Sending packet to server',new Date())
    console.log(packet)
    setTimeout(() => {
        socket.publish('send-data',packet)
    },5 * 1000)

})

It seems to work with multiple workers and brokers as it should. selection_071

jondubois commented 6 years ago

@happilymarrieddad Yes I was thinking of something like this! But I agree it's not very straight forward.

Did you try this out in production? Does it improve things?

happilymarrieddad commented 6 years ago

@jondubois I'm writing a module to test it in our staging environment. I'll let you know as soon as I test it. Thanks!

happilymarrieddad commented 6 years ago

@jondubois This is the module I'm going to test.

var DEFAULT_TIMEOUT = 50;
var DEFAULT_NUM_OF_MESSAGE_PER_PASS = 100;

module.exports = {
    attach:function(worker,options) {

        options = options || {};

        var scServer = worker.scServer;
        var queue = [];
        var debug = options.debug || false;
        var timeout = options.timeout || DEFAULT_TIMEOUT;
        var num_of_messages_per_pass = options.numOfMessagesPerPass || DEFAULT_NUM_OF_MESSAGE_PER_PASS;

        var handler = function() {
            if (debug) {
                console.log('Publishing data to clients');
            }

            var packets = [];
            for (var i = 0; i < num_of_messages_per_pass; i++) {
                if (queue.length) packets.push(queue.shift());
            }

            for (var i = 0,len = packets.length; i < len;i++) {
                packets[i].next(null,packets[i].data);
            }

            setTimeout(() => handler(),timeout);
        }
        handler();

        scServer.addMiddleware(scServer.MIDDLEWARE_PUBLISH_OUT,function(req,next) => {

            if (debug) {
                console.log('Storing packet in queue');
            }

            queue.push({
                data:req.data,
                next:next
            })

        });

    }
}
happilymarrieddad commented 6 years ago

@jondubois Just tested it with SCC, 1 BROKER, 1 STATE, 2 APP servers, 4 CLIENTS, and it only broadcasted 1 to each client. I'm pretty sure we're good to go.

selection_072

happilymarrieddad commented 6 years ago

@jondubois @toredash https://www.npmjs.com/package/sc-publish-out-queue

jondubois commented 6 years ago

@happilymarrieddad That's great thanks. I didn't get time to test it out. It would be nice to see before and after stats to see how it affects performance for scenarios where there are a lot of subscribers.

I have some feedback about your plugin code:

  1. It would be good to use something other than Array.shift() here: https://github.com/happilymarrieddad/sc-publish-out-queue/blob/master/index.js#L20 - When you remove elements from the front of an Array in JS, it has to update the index for ALL elements which come afterwards every time you call shift... So it is inefficient in this case because shift gets called many times (unless there is some special compiler optimisation that I don't know about). Maybe you can access elements using something like array[i] instead and once the loop is finished you can call Array.splice(...) a single time to remove all the elements which were just sent. See https://stackoverflow.com/questions/6501160/why-is-pop-faster-than-shift and https://jsperf.com/popvsshift
  2. Instead of var handler = function () { here https://github.com/happilymarrieddad/sc-publish-out-queue/blob/master/index.js#L15 you could use var handler = () => { instead. That way you don't have to add an extra closure here: https://github.com/happilymarrieddad/sc-publish-out-queue/blob/master/index.js#L40

We could consider integrating that logic directly into SC at some point in the future. Maybe using with an option in SC. Depends how popular the use case is and if there are any drawbacks to this approach. But for now it's good as a plugin.

happilymarrieddad commented 6 years ago

Ok, it's on our live production system at the moment. We have somewhere around 200 users who are constantly hitting the system with requests and I haven't seen any issues yet. That being said, it probably isn't tested enough because I've only tested it with my own scripts and our production environment. I will make those changes and push it up. Thanks!

happilymarrieddad commented 6 years ago

@jondubois I've published 0.2.1 which has those optimizations and an example in the sample folder with somewhat understandable instructions. Thanks!

toredash commented 6 years ago

@happilymarrieddad The approach is good, but it feels very static. If I understand the code correct, this isn't a more efficient way of publishing messages? If there is available resources there shouldn't be a need to limit how fast SC operates.

@jondubois Thanks for input regarding snapshotting. I think that approach wouldn't scale in our setup because of the large amount of subscribers and messages.

happilymarrieddad commented 6 years ago

@toredash no but it will keep the event loop from getting clogged. I'm not really sure how to make it more efficient. When I get a chance, I'll take a look at the publish method and see if there is anything there that could be improved especially in light of V8's updates in Node 8.X.

toredash commented 6 years ago

FYI,

We have tried with success a async queue that uses a (configurable) minimum and maximum delay for emitting messages to a channel. The delay is dynamic, based on load.

Example, define a minimum delay of 100ms and a maximum delay of 1000ms. When a worker is emitting message, check (os.loadavg()[0] * 1000) / os.cpus().length. If that value is in between min and max, delay message with that amount of ms.

If below, use minimum delay. If above, use maximum delay.

We run each SCC instance on all cores.

For our usecase, this works very well.

We did try to play with pubSubBatchDuration with little luck, I'm not sure how much we would save on server-resources. AFAIK we would save some websocket frames but I don't think that would be our main bottleneck either way.

semiaddict commented 3 years ago

I am experiencing very high CPU loads when publishing messages to a (very) large number of subscribers (up tp 30k) causing the system to fail. I know I could horizontally scale the application, but messages are rarely published (only 1-2 per day), so the CPU peak is too rare and too short in time to scale depending on load. I have been able to reach 20K subscribers, but a queue system would be highly useful in my situation.

@happilymarrieddad, thank you for sharing your code! @toredash, did you ever publish yours? Also, has anyone dealt with this in SC 16 ?

jondubois commented 3 years ago

@semiaddict Have you tried to add a MIDDLEWARE_OUTBOUND middleware with a randomized delay? That way you can spread out the messages over a few seconds instead of pushing them out to all clients at once.

By default, SC will try to push out a copy of the message to all clients ASAP so this can create a bottleneck if the channel has a lot of subscribers as it will try to get all those messages out within the space of only a few milliseconds; this is what causes a CPU spike and freeze. Adding a random delay will smooth out the spike over a slightly longer amount of time. Since you don't publish these messages too often, it makes a lot of sense for you to do this.

You should check if an outbound action is of type AGAction.PUBLISH_OUT (and you can check what the action.channel is so that the random delay only affects that specific channel), if so, you should await for some random amount of time between 0 and 10000 milliseconds (for example) before you invoke action.allow();. Note that the main middleware function is instantiated for each socket so in many cases, it's OK to block the for-await-of loop using something like await delay(Math.random() * 10000).

By adding some randomness as above, it will spread out the messages to clients over 10 seconds so you shouldn't get that bottleneck issue (otherwise you can try spreading them out over 20 seconds...). You can make this randomized delay longer or shorter depending on your requirements. A longer max delay will smooth out the CPU peak but will mean that some clients could get the message a bit later than others.

I recommend carefully reading the https://socketcluster.io/docs/middleware-and-authorization/ page to understand how middlewares work.

semiaddict commented 3 years ago

Thank you very much @jondubois for this information! I will try this out in the upcoming days and report back.