tt-up / fed-in-depth

经验、知识、笔记——让坚持学习成为一种习惯
6 stars 1 forks source link

用 Node 实现一个简单的 SSE server #7

Open yuqingc opened 4 years ago

yuqingc commented 4 years ago

准备

Server

'use strict'

const path = require('path');
const express = require('express');
const SseStream = require('ssestream');

const app = express();

class SseServer {
  constructor (options) {
    this.sseQueue = new Set()
    this.maxConnections = options.maxConnections || Infinity;
    this.middleWare = this.middleWare.bind(this);
    this.announce = this.announce.bind(this);
  }
  middleWare () {
    return (req, res) => {
      const sseQueue =  this.sseQueue;
      if (sseQueue.size >= this.maxConnections) {
        return res.status(429).send()
      }
      const sse = new SseStream(req);
      sse.pipe(res);
      const metaData = [sse, req, res];
      sseQueue.add(metaData)
      req.on('close', function () {
        console.log('CONNECTION CLOSED!!!')
        sseQueue.delete(metaData)
      })
    }
  }
  announce (data) {
    this.sseQueue.forEach((meta) => {
      const message = {
        data,
      };
      meta[0].write(message);
    })
  }
}

const mySseServer = new SseServer({maxConnections: 3})

app.use('/api/sse', mySseServer.middleWare());

const port = 8888;
app.listen(port, () => {
  console.log(`App is listening to port :${port}`);
});

// 模拟推送消息
setInterval(() => {
  console.log('current connect number:', mySseServer.sseQueue.size);
  mySseServer.announce(`It is ${new Date()} now!`);
}, 2000);

客户端

const sse = new EventSource('/api/sse');
sse.onerror = (err) => {
  console.error('An error occurred!!', err);
};
sse.onmessage = (e) => {
  console.log('Received message: ', e.data);
};