mafintosh / signalhub

Simple signalling server that can be used to coordinate handshaking with webrtc or other fun stuff.
MIT License
667 stars 97 forks source link

Use with Express on server #28

Closed tvedtorama closed 7 years ago

tvedtorama commented 7 years ago

Hi,

I wanted to test the webrtc-swarm library and needed a signalhub server for that purpose.

I was not sure the server.js file would work nicely alongside my existing setup, so I made a version that produces an Express router. The code is just a quick mock up, but it works for my purpose:

(Note: Code is in Typescript)

import * as express from 'express'
const collect = require('stream-collector')
const pump = require('pump')
const iterate = require('random-iterate')
const limiter = require('size-limit-stream')
const eos = require('end-of-stream')

const flushHeaders = function (res) {
    if (res.flushHeaders) {
        res.flushHeaders()
    } else {
        if (!res._header) res._implicitHeader()
        res._send('')
    }
}

/** Set up a signalhub logic primarliy for webrtc-swarm, allowing the client to send and receive
 * on channels "all" and "user-specific" (the uuid).
 *
 * Code migrated from https://github.com/mafintosh/signalhub/blob/master/server.js
 */
export const getHubRouter = function (opts: { maxBroadcasts?: number, rootName: string }) {
    let channels = {}
    let maxBroadcasts = (opts && opts.maxBroadcasts) || Infinity
    let subs = 0

    const router = express.Router()

    const findName = (req: express.Request) => req.params["app"] + "/" + req.params["channel"]

    /** channel = "app/channel" */
    const getAppChannel = function (channel: string) {
        if (channels[channel]) return channels[channel]
        const sub = { name: channel, subscribers: [], heartbeat: null }
        sub.heartbeat = setInterval(heartbeater(sub), 30 * 1000)
        channels[channel] = sub
        return channels[channel]
    }

    router.post("/v1/:app/:channel", (req, res) => {
        collect(pump(req, limiter(64 * 1024)), function (err, data) {
            if (err) return res.end()
            const name = findName(req)
            if (!channels[name]) return res.end()
            const channel = getAppChannel(name)

            data = Buffer.concat(data).toString()

            let ite = iterate(channel.subscribers)
            let next
            let cnt = 0

            while ((next = ite()) && cnt++ < maxBroadcasts) {
                next.write('data: ' + data + '\n\n')
            }

            res.end()
        })
    })

    router.get("/v1/:app/:channel", (req, res) => {
        res.setHeader('Content-Type', 'text/event-stream; charset=utf-8')

        const name = findName(req)
        const app = name.split('/')[0]
        const channelNames = name.slice(app.length + 1)

        channelNames.split(',').forEach(function (channelName) {
            const channel = getAppChannel(app + '/' + channelName)
            channel.subscribers.push(res)
            subs++
            eos(res, function () {
                subs--
                let i = channel.subscribers.indexOf(res)
                if (i > -1) channel.subscribers.splice(i, 1)
                if (!channel.subscribers.length && channel === channels[channel.name]) {
                    clearInterval(channel.heartbeat)
                    delete channels[channel.name]
                }
            })
        })

        flushHeaders(res)
    })
    return router
}

function heartbeater(sub) {
    return function () {
        for (let i = 0; i < sub.subscribers.length; i++) {
            sub.subscribers[i].write(':heartbeat signal\n\n')
        }
    }
}

Then i hooked it up this way:

    app.use('/signalhub/', getHubRouter({rootName: '/signalhub/'})) // No body parser!

    // Existing setup...
    app.use(bodyParser.urlencoded({ extended: true }))
    app.use(bodyParser.json())

    app.use('/', eventRouter)
    app.use('/', router)