triblondon / node-sse-pubsub

Server sent events for NodeJS
MIT License
71 stars 16 forks source link

Do you want these changes? #1

Open ghost opened 6 years ago

ghost commented 6 years ago

I found your module very useful. I forked it and made quite a few changes. I was kind of in a rush so I wasn't able to separate out my changes into immediately mergeable commits. Before I do that though, I'm gonna list what they are.

The major questionable thing i did was change up the tests so that setupServer only sets up the sse channels and not the http server. I wasn't (at the time) able to figure out how to get setupServer to compile with typescript, so I went with afterEach and beforeEach hooks to setup the server and tear it down.

I'm still learning typescript so maybe this isn't done as well as it could be, but if you wanna take a look, you can see it here: https://github.com/jrobeson/node-sse-pubsub/tree/fork

lishine commented 4 years ago

Fastify typescript version: :)

import Fastify, { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify'

type Events = (string | RegExp)[]
type Client = { req: FastifyRequest; res: FastifyReply; events?: Events }

function hasEventMatch(subscriptionList: Events | undefined, eventName: string) {
    return (
        !subscriptionList ||
        subscriptionList.some((pat) => (pat instanceof RegExp ? pat.test(eventName) : pat === eventName))
    )
}
export class SSEChannel {
    nextID: number
    clients: Set<Client>
    options: Record<string, any>
    messages: Record<string, any>[]
    pingTimer?: NodeJS.Timeout

    constructor(options: Record<string, any>) {
        this.options = Object.assign(
            {},
            {
                pingInterval: 3000,
                maxStreamDuration: 30000,
                clientRetryInterval: 1000,
                startId: 1,
                historySize: 100,
                rewind: 0,
            },
            options
        )

        this.nextID = this.options.startId
        this.clients = new Set()
        this.messages = []

        if (this.options.pingInterval) {
            this.pingTimer = setInterval(() => this.publish(), this.options.pingInterval)
        }
    }

    publish(data?: string | Record<string, any>, eventName?: string) {
        const id = this.nextID
        if (typeof data === 'object') {
            data = JSON.stringify(data)
        }
        data = data
            ? data
                  .split(/[\r\n]+/)
                  .map((str) => 'data: ' + str)
                  .join('\n')
            : ''

        const output =
            (data ? 'id: ' + id + '\n' : '') +
            (eventName ? 'event: ' + eventName + '\n' : '') +
            (data || 'data: ') +
            '\n\n'
        ;[...this.clients]
            .filter((c) => !eventName || hasEventMatch(c.events, eventName))
            .forEach((c) => {
                c.res.raw.write(output)
            })

        this.messages.push({ id, eventName, output })
        while (this.messages.length > this.options.historySize) {
            this.messages.shift()
        }
        this.nextID++

        return id
    }

    subscribe(req: FastifyRequest, res: FastifyReply, events?: Events) {
        const c = { req, res, events }
        c.req.raw.socket.setNoDelay(true)
        c.res.raw.writeHead(200, {
            'Content-Type': 'text/event-stream',
            'Cache-Control':
                's-maxage=' +
                (Math.floor(this.options.maxStreamDuration / 1000) - 1) +
                '; max-age=0; stale-while-revalidate=0; stale-if-error=0',
            Connection: 'keep-alive',
        })
        let body = 'retry: ' + this.options.clientRetryInterval + '\n\n'

        const lastID = Number.parseInt(req.headers['last-event-id'] as string, 10)
        const rewind = !Number.isNaN(lastID) ? this.nextID - 1 - lastID : this.options.rewind
        if (rewind) {
            this.messages
                .filter((m) => hasEventMatch(c.events, m.eventName))
                .slice(0 - rewind)
                .forEach((m) => {
                    body += m.output
                })
        }

        c.res.raw.write(body)
        this.clients.add(c)

        setTimeout(() => {
            if (!c.res.raw.finished) {
                this.unsubscribe(c)
            }
        }, this.options.maxStreamDuration)
        c.res.raw.on('close', () => {
            this.unsubscribe(c)
        })
        return c
    }

    unsubscribe(c: Client) {
        c.res.raw.end()
        this.clients.delete(c)
    }

    listClients() {
        const rollupByIP = {} as Record<string, number>
        this.clients.forEach((c) => {
            const ip = c.req.raw.socket.remoteAddress ?? ''
            if (!(ip in rollupByIP)) {
                rollupByIP[ip] = 0
            }
            rollupByIP[ip]++
        })
        return rollupByIP
    }

    getSubscriberCount() {
        return this.clients.size
    }
}
GHNewbiee commented 1 year ago

@lishine It would really be very nice to exist a new fastify-sse-pubsub plugin since you have already added the types. Tia