SocketCluster / socketcluster

Highly scalable realtime pub/sub and RPC framework
https://socketcluster.io
MIT License
6.14k stars 313 forks source link

Is it possible to merge publish messages? #566

Closed rushikeshkoli03 closed 1 year ago

rushikeshkoli03 commented 2 years ago

Can the publish messages be merged if the client is subscribed to multiple to topics. Examples would be:

  1. Producer produces message every second for around 400-500 topics.
  2. client is subscribed to around 100 topics
  3. So instead of sending 100 publish message every second to the client, could they be merged into an array?
MegaGM commented 1 year ago

Apologies for the delayed response. Of course it's possible.
Here I made a very simplified example of how you could implement message aggregation using middlewares.

// client
!async function doSomethingWithFlushedBuffer() {
  const channel = socket.subscribe('RushikeshWatchingList')
  for await (const data of channel) {
    console.info('RushikeshWatchingList data', data)
  }
}()
// server
const messageBuffer = {}
const bufferableChannels = ['currency1', 'currency2', 'currency3']

agServer.setMiddleware(agServer.MIDDLEWARE_OUTBOUND, async (middlewareStream) => {
  for await (const action of middlewareStream) {
    if (action.type === action.PUBLISH_OUT) {
      const { socket, channel, data } = action

      if (!bufferableChannels.includes(channel)) {
        action.allow()
        continue
      }

      if (!messageBuffer[socket.id]) {
        messageBuffer[socket.id] = []
      }
      messageBuffer[socket.id].push({ channel, data })
      action.block(`Message has been buffered`)

      if (messageBuffer[socket.id].length <= 500) {
        continue
      }

      agServer.exchange.transmitPublish('RushikeshWatchingList', messageBuffer[socket.id])
      delete messageBuffer[socket.id]
      // Message buffer has been flushed
    }
  }
})

If you have some other questions, please, ask them in chat https://gitter.im/SocketCluster/socketcluster