nitrojs / nitro

Next Generation Server Toolkit. Create web servers with everything you need and deploy them wherever you prefer.
https://nitro.build
MIT License
6.26k stars 515 forks source link

WebSockets Cloudflare Pub/Sub & Durable Objects Support #2436

Open cliqer opened 6 months ago

cliqer commented 6 months ago

Describe the feature

I have extended the cloudflare adapter with pub/sub but can't make it work when deployed. Any pointers on how to fix this or are there other issues with cf websockets?

In wrangler I added the durable object and ran it with the cloudflare-module preset:

# wrangler.toml
[durable_objects]
bindings = [
    { name = "TOPIC_DO", class_name = "default" }
]
// adapters/cloudflare.ts

import type * as _cf from "@cloudflare/workers-types";

import { Peer } from "../peer";
import { AdapterOptions, defineWebSocketAdapter } from "../types.js";
import { Message } from "../message";
import { WSError } from "../error";
import { createCrossWS } from "../crossws";
import { toBufferLike } from "../_utils";

type Env = Record<string, any>;

declare const WebSocketPair: typeof _cf.WebSocketPair;
declare const Response: typeof _cf.Response;

export interface CloudflareAdapter {
    handleUpgrade(
        req: _cf.Request,
        env: Env,
        context: _cf.ExecutionContext,
    ): Promise<_cf.Response>;
}

export interface CloudflareOptions extends AdapterOptions {}

const topics = new Map<string, Set<_cf.WebSocket>>();

export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>((options = {}) => {
    const crossws = createCrossWS(options);

    const handleUpgrade = async (req: _cf.Request, env: Env, context: _cf.ExecutionContext) => {
        const pair = new WebSocketPair();
        const client = pair[0];
        const server = pair[1];

        const peer = new CloudflarePeer({
            cloudflare: { client, server, req, env, context },
        });

        const { headers } = await crossws.upgrade(peer);

        server.accept();
        crossws.$callHook("cloudflare:accept", peer);
        crossws.callHook("open", peer);

        server.addEventListener("message", (event) => {
            const data = JSON.parse(<string>event.data);
            if (data.action === 'publish') {
                publish(data.topic, data.message);
            } else if (data.action === 'subscribe') {
                subscribe(server, data.topic);
            } else if (data.action === 'unsubscribe') {
                unsubscribe(server, data.topic);
            }
            crossws.$callHook("cloudflare:message", peer, event);
            crossws.callHook("message", peer, new Message(event.data));
        });

        server.addEventListener("error", (event) => {
            crossws.$callHook("cloudflare:error", peer, event);
            crossws.callHook("error", peer, new WSError(event.error));
        });

        server.addEventListener("close", (event) => {
            topics.forEach((subscribers, topic) => unsubscribe(server, topic));
            crossws.$callHook("cloudflare:close", peer, event);
            crossws.callHook("close", peer, {
                code: event.code,
                reason: event.reason,
            });
        });

        return new Response(null, {
            status: 101,
            webSocket: client,
            headers,
        });
    };

    return { handleUpgrade };
});

class CloudflarePeer extends Peer<{
    cloudflare: {
        client: _cf.WebSocket;
        server: _cf.WebSocket;
        req: _cf.Request;
        env: Env;
        context: _cf.ExecutionContext;
    };
}> {
    get addr() {
        return undefined;
    }

    get url() {
        return this.ctx.cloudflare.req.url;
    }

    get headers() {
        return this.ctx.cloudflare.req.headers as Headers;
    }

    get readyState() {
        return this.ctx.cloudflare.client.readyState as -1 | 0 | 1 | 2 | 3;
    }

    send(message: any) {
        this.ctx.cloudflare.server.send(toBufferLike(message));
        return 0;
    }

    subscribe(topic: string): void {
        const topicId = this.ctx.cloudflare.env.TOPIC_DO.idFromName(topic);
        const topicDO = this.ctx.cloudflare.env.TOPIC_DO.get(topicId);
        topicDO.fetch('/subscribe', { method: 'POST', body: this.ctx.cloudflare.req.cf?.requestId });
    }

    unsubscribe(topic: string): void {
        const topicId = this.ctx.cloudflare.env.TOPIC_DO.idFromName(topic);
        const topicDO = this.ctx.cloudflare.env.TOPIC_DO.get(topicId);
        topicDO.fetch('/unsubscribe', { method: 'POST', body: this.ctx.cloudflare.req.cf?.requestId });
    }

    publish(topic: string, message: any, options?: { compress?: boolean }): void {
        const topicId = this.ctx.cloudflare.env.TOPIC_DO.idFromName(topic);
        const topicDO = this.ctx.cloudflare.env.TOPIC_DO.get(topicId);
        topicDO.fetch('/publish', { method: 'POST', body: JSON.stringify({ topic, message }) });
    }
}

function publish(topic: string, message: string) {
    const subscribers = topics.get(topic);
    if (subscribers) {
        subscribers.forEach(subscriber => {
            subscriber.send(JSON.stringify({ topic, message }));
        });
    }
}

function subscribe(ws: _cf.WebSocket, topic: string) {
    let subscribers = topics.get(topic);
    if (!subscribers) {
        subscribers = new Set();
        topics.set(topic, subscribers);
    }
    subscribers.add(ws);
}

function unsubscribe(ws: _cf.WebSocket, topic: string) {
    const subscribers = topics.get(topic);
    if (subscribers) {
        subscribers.delete(ws);
        if (subscribers.size === 0) {
            topics.delete(topic);
        }
    }
}

Additional information

manniL commented 1 week ago

Progress via https://github.com/nitrojs/nitro/pull/2801