oven-sh / bun

Incredibly fast JavaScript runtime, bundler, test runner, and package manager – all in one
https://bun.sh
Other
73.46k stars 2.71k forks source link

SSE with `serve` documentation missing #2663

Open simylein opened 1 year ago

simylein commented 1 year ago

What is the type of issue?

Documentation is missing

What is the issue?

I would love to use serve for SSE but cannot find any documentation about it apart from this issue. If serve is ready for handling SSE with the help of ReadableStream there should be documentation about it.

Where did you find it?

No response

gtrabanco commented 1 year ago

No, its not working by the moment. It push data every 5 secs at once and should be almost instantly when you write.

See that node:http2 is not implemented and necessary to have this working well:

cirospaciari commented 1 year ago

You can use HTTP / 1.1 at this moment until we add support for HTTP / 2

Flush was fixed by https://github.com/oven-sh/bun/pull/3073

This should work on the next version:

function sendSSECustom(controller: ReadableStreamDirectController, eventName: string, data: string) {
  return controller.write(`event: ${eventName}\ndata:${JSON.stringify(data)}\n\n`);
}
function sendSSEMessage(controller: ReadableStreamDirectController, data: string) {
  return controller.write(`data:${JSON.stringify(data)}\n\n`);
}
function sse(req: Request): Response {
  const signal = req.signal;
  return new Response(
    new ReadableStream({
      type: "direct",
      async pull(controller: ReadableStreamDirectController) {
        while (!signal.aborted) {
          await sendSSECustom(controller, "bun", "Hello, World!");
          await sendSSEMessage(controller, "Hello, World!");
          await controller.flush();
          await Bun.sleep(1000);
        }
        controller.close();
      },
    }),
    { status: 200, headers: { "Content-Type": "text/event-stream" } },
  );
}
Bun.serve({
  port: 3000,
  fetch(req) {
    if (new URL(req.url).pathname === "/stream") {
      return sse(req);
    }
    return new Response("Hello, World!");
  },
});

You may will wanna use something like lastEventId to resume reconnections (by sending id:), and send retry: to control retry intervals.

const lastEventId = req.headers.get("last-event-id")
simylein commented 1 year ago

Thanks a lot 😀. This really helps building real time stuff 🙂.

For those interested, here is my implementation:

import EventEmitter from 'events';
import { debug, info } from '../logger/logger';

export const emitter = new EventEmitter();

export const subscribe = (req: Request, channel: string): Response => {
    info(`subscribing to channel '${channel}'`);
    return new Response(
        new ReadableStream({
            type: 'direct',
            pull(controller: ReadableStreamDirectController) {
                let id = +(req.headers.get('last-event-id') ?? 1);
                const handler = async (data: unknown): Promise<void> => {
                    await controller.write(`id:${id}\ndata:${data !== undefined ? JSON.stringify(data) : ''}\n\n`);
                    await controller.flush();
                    id++;
                };
                emitter.on(channel, handler);
                if (req.signal.aborted) {
                    info(`unsubscribing from channel '${channel}'`);
                    emitter.off(channel, handler);
                    controller.close();
                }
                return new Promise(() => void 0);
            },
        }),
        {
            status: 200,
            headers: { 'content-type': 'text/event-stream' },
        },
    );
};

export const emit = (channel: string, data?: unknown): void => {
    debug(`emitting to channel '${channel}'`);
    emitter.emit(channel, data);
};
gtrabanco commented 1 year ago

Thanks a lot 😀. This really helps building real time stuff 🙂.

For those interested, here is my implementation:

(...)

I modifies your solution a little bit to have "onClose" event, useful for intervals or do whatever when connection is closed. I just wanted to share.

import EventEmitter from 'node:events';
export const emitter = new EventEmitter();

export type SSEEvent = {
  event?: string;
  data?: unknown;
}

export type SSEOptions = {
  retry: number;
  onClose: () => void;
}

function info(...args: unknown[]): void {
  console.info(...args);
}

function debug(...args: unknown[]): void {
  console.debug(...args);
}

function channelSubscribe(channel: string[], handler: (payload: SSEEvent) => void): void {
  channel.forEach((channel) => {
    emitter.on(channel, handler);
  });
}

function channelUnsubscribe(channel: string[], handler: (payload: SSEEvent) => void): void {
  channel.forEach((channel) => {
    emitter.off(channel, handler);
  });
}

export const sseSubscribe = (req: Request, channel: string | Array<string>, options: Partial<SSEOptions> = {
  retry: 1000,
  onClose: () => { }
}): Response => {
  info(`subscribing to channel '${channel}'`);
  const stream = new ReadableStream({
    type: 'direct',
    async pull(controller: ReadableStreamDirectController) {
      let id = +(req.headers.get('last-event-id') ?? 1);

      if (options.retry !== undefined) {
        await controller.write(`retry:${options.retry}\n`);
      }

      const handler = async (payload: SSEEvent): Promise<void> => {
        const { event = undefined, data = undefined } = payload as Record<string, unknown>;
        if (event !== undefined) {
          await controller.write(`event:${event}\n`);
        }
        await controller.write(`id:${id}\n`)
        await controller.write(`data:${data !== undefined ? JSON.stringify(data) : ''}\n\n`);
        await controller.flush();
        id++;
      };

      function closeConnection(reason: string | undefined = 'reason unknown') {
        return () => {
          info(`unsubscribing from channel '${channel}': ${reason}`);
          channelUnsubscribe(Array.isArray(channel) ? channel : [channel], handler);
          options.onClose?.();
          controller.close();
        }
      }

      channelSubscribe(Array.isArray(channel) ? channel : [channel], handler);

      req.signal.addEventListener('abort', closeConnection('Connection aborted'));

      req.signal.addEventListener('close', closeConnection('Connection closed'));

      if (req.signal.aborted) {
        closeConnection('Connection aborted originally')();
      }
      return new Promise(() => void 0);
    },
  });

  return new Response(stream, {
    status: 200,
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      Connection: 'keep-alive'
    }
  });
};

export const sseEmit = (channel: string, payload?: SSEEvent): void => {
  debug(`emitting to channel '${channel}'`);
  emitter.emit(channel, payload);
};
7heMech commented 1 month ago

@cirospaciari What does type: 'direct' do? I mean my IDE tells me I can only put bytes in there.