trpc / trpc

🧙‍♀️ Move Fast and Break Nothing. End-to-end typesafe APIs made easy.
https://tRPC.io
MIT License
34.47k stars 1.22k forks source link

Consider SSE to enable server->client http subscriptions #544

Closed delaneyj closed 3 months ago

delaneyj commented 3 years ago

Reading through the #532 PR it occurred to me that the JSON-RPC notification system only works one way, from client to server. Most of the time you want it to be from server to client. Although nothing about the spec says which side is which, in a browser this basically means the browser is the client.

One interesting approach could be to use Server Sent Events. This would allow for stateless load balance-able subscriptions that could work with HTTP2. From what I'm seeing though I'm not sure if it would technically fall under a JSON-RPC complaint spec; though you probably can treat as a server to client notification. Nice thing is this is already built into browsers with polyfills available for node and most languages.

I think you'd create/cancel as you do currently but the client would also fetch on a new route /subscription/3nxu4ex234. Know this is separate from the websocket implementation but SSE isn't a well known. The major downside of the SSE spec as it stands is the text only protocol (compared to binary for websockets) but in a JSON-RPC context the point is moot.

TRP-63

Funding

Fund with Polar

KATT commented 3 years ago

SSE should be possible to add alongside WebSockets. I prefer WS myself as I don't have to think about max concurrent connection limit and my application doesn't have user numbers that make it hard to scale anyway.

The way our HTTP subscriptions work right now is through long polling and you pass a cursor of the current state and server let's the request hang until there's new data / times out. Inspect https://chat.trpc.io/ to see how the long polling works.

delaneyj commented 3 years ago

I too love websockets, when they work. At scale have had issues with OOM memory causing a host of issues. The major issues at scale have been token based access timers, sticky load balancing, if a server goes down it takes everyone with it. I'll dig into the examples more but this was just to plant the idea in your head. :smiley:

KATT commented 3 years ago

Isn't scaling with SSE similar though, or am I missing something? It's still a persistent connection between a client and server(s) that also needs to be scaled.

KATT commented 3 years ago

Scaling is not my area of expertise, but with WS and the current setup it should scale seamlessly on things like k8s/render.com - we'll listen to a SIGTERM and broadcast a reconnect notification to all clients which will create an overlapping connection and send a ReconnectError to any listeners.

SIGTERM

https://github.com/trpc/trpc/blob/dcfff9e234b99d55d00157eb11a6300cf841e46b/examples/experimental-next-prisma-starter-websockets/src/server/wssDevServer.ts#L18-L22

ReconnectError

https://github.com/trpc/trpc/blob/dcfff9e234b99d55d00157eb11a6300cf841e46b/packages/server/test/websockets.test.ts#L439-L484

delaneyj commented 3 years ago

If you are handling reconnect that's great, the SSE spec has it built in. Since you are relying on ws it could be more reliable than the previous an earlier socket.io based approach.

delaneyj commented 3 years ago

I'll close for now, will do more testing with the websocket approach until I have better reason to seek alternatives.

thelinuxlich commented 2 years ago

graphql-yoga 2 added subscriptions exclusively through SSE, since HTTP/2 you don't have to care about max concurrent connections as it was in the past

KATT commented 2 years ago

I'll reopen this and see if someone wants to work on it for the next major :)

neronim1141 commented 2 years ago

Im writing map app for one game, where data for map tiles is send by HTTP request, and i want to update the client map in real time, i managed to do that with graphql-yoga and SSE because they are operating on one node process, where your example needs to setup additional process for websockets, which works between trpc requests, but i cant manage it to work when http tries to send data to trpc subscriptions, and while i love how everything else turned out, this one thing totally block me from converting to trpc, I may be still to inexperienced and dont know a way to get it to work like that

punkpeye commented 2 years ago

Very similar situation to @neronim1141 Had everything migrated and then realized SSE is not working.

LucasAlda commented 1 year ago

It would be really awesome to have SSE. It is awesome for situations where you want to give some partial responses without setting a full and expensive WS. Really wanting this to be a thing

AlaaZorkane commented 1 year ago

I believe that a lot of cases don't really need the bidirectional communication nowadays - SSE is a perfect fit and plays well with HTTP/2 - A lot of the graphql community is also leaning toward this approach of HTTP/2 queries/mutations combined with SSE.

A good read on the matter: https://wundergraph.com/blog/deprecate_graphql_subscriptions_over_websockets

Would really like to see this worked on in the context of tRPC 🙏

elderapo commented 1 year ago

Before going with SSE I suggest reading https://dev.to/miketalbot/server-sent-events-are-still-not-production-ready-after-a-decade-a-lesson-for-me-a-warning-for-you-2gie. It describes pitfall that cannot be solved on code level but instead on infrastructure one - to which we (devs) don't always have full access to.

thelinuxlich commented 1 year ago

It can be solved on your side, you close connections going to proxy servers that doesn't support HTTP/2 or higher.

elderapo commented 1 year ago

It can be solved on your side, you close connections going to proxy servers that doesn't support HTTP/2 or higher.

Huh? Then the TRPC client is never going to receive subscription events?

thelinuxlich commented 1 year ago

No, you close after the event is sent

thelinuxlich commented 1 year ago

Look at this: https://mercure.rocks/

fyyyyy commented 1 year ago

Good comparison of polling vs SSE vs web sockets ( at scale ) https://www.youtube.com/watch?v=6QnTNKOJk5A

partmor commented 1 year ago

Isn't scaling with SSE similar though, or am I missing something? It's still a persistent connection between a client and server(s) that also needs to be scaled.

One huge advantage about using SSE over WS (assuming you only need server->client events, not bidirectional) is that you don’t need to spin up an additional server. This is a big deal in terms of infra complexity.

And you’re not using a different protocol.

carere commented 1 year ago

@KATT Hello, first of all, thanks for this awesome package !!

I'll be happy to try implementing SSE in TRPC, do you have some insight or starting point in order to implement such feature ?? When using SSE, we need to first send headers (with specific values) to keep connection open. It's seems that's it's the main problem with TRPC. I tried to create an endpoint with TRPC and fastify-sse and i received the error, that headers were already sent.

Anyway, would be happy to help 😃

leonwilly commented 1 year ago

I know @carere volunteered to tackle this but we couldn't wait at my company. So I went ahead and implemented this. I'll be submitting a P.R here shortly.

yacoubb commented 1 year ago

@leonwilly looking forward to it!

tarekwiz commented 1 year ago

@leonwilly Should we expect that anytime soon? No pressure just tryna get an ETA :)

leonwilly commented 1 year ago

Hopefully end of this sprint (Monday) I'm swamped right now.

jaivinwylde commented 1 year ago

@leonwilly when you have time it would be amazing to get that pr, it would be great for the trpc community.

StringKe commented 1 year ago

I'd like to help test the SSE functionality, any progress so far?

OutdatedVersion commented 1 year ago

Hey all! I was in a similar situation as leonwilly and implemented this for work. Bare-boned and extracted its available at https://github.com/OutdatedVersion/trpc-sse-link.

https://github.com/trpc/trpc/assets/11138610/f4d887f3-8585-4a31-82a1-a3707360687a

Since I've been telling myself I'd figure out the best way to get this into mainline tRPC ... for a month please anyone feel free to take any of that code for a PR. ❤️

timcole commented 1 year ago

Nice work @OutdatedVersion this is good stuff! I've been waiting/wanting SSE in tRPC for a while and this is an amazing start, however, I'm not sure a stream for every subscription like this is the best approach. Instead, I would expect a shared stream that has an accompanied endpoint to post sub/unsub topics is a better approach. This accompanied endpoint would just be handled by useSubscription or whatnot still.

Similar to how Twitter web client connects to https://api.twitter.com/live_pipeline/events which starts an eventstream with an optional topic parameter of initial topics, then posts to https://api.twitter.com/1.1/live_pipeline/update_subscriptions with a comma-separated list sub_topics and unsub_topics to adjust the listening topics.

There are many reasons why a single stream is best here but one main reason is the 100 connections per domain default limit with HTTP/2 and 6 connections on HTTP/1.1. If you have multiple things that need events and user has more than 1 tab open you'll hit the limit pretty quickly with multiple streams per event.

matannahmani commented 1 year ago

@OutdatedVersion did you manage to make it work on Router handler (Next 13 app -> API). I can't manage to convert it https://github.com/OutdatedVersion/trpc-sse-link/blob/main/example/src/pages/api/trpc/%5Btrpc%5D.ts

the event's arent sent and request finishes immediately here is my attempt:

const handler = async (
  req: Request,
  context: {
    params: { trpc: string | string[] };
  }
) => {
  if (Array.isArray(context.params.trpc)) {
    return fetchRequestHandler({
      endpoint: "/api/trpc",
      req,
      router: appRouter,
      createContext: createTRPCContext,
      onError:
        env.NODE_ENV === "development"
          ? ({ path, error }) => {
              console.error(
                `❌ tRPC failed on ${path ?? "<no-path>"}: ${error.message}`
              );
            }
          : undefined,
    });
  }
  // @ts-expect-error @todo to fix it
  const procedure = appRouter?._def?.procedures?.[context.params.trpc] as
    | AnyProcedure
    | undefined;
  if (req.method === "GET" && procedure?._def.subscription) {
    const resHeaders = new Headers();
    const ctx = await createTRPCContext({
      req,
      resHeaders,
    });
    // Create a TransformStream for writing the response as the tokens as generated
    const stream = new TransformStream();
    const writer = stream.writable.getWriter();
    try {
      // TODO: support POST
      // https://github.com/trpc/trpc/blob/7ad695ea33810a162808c43b6fba1fb920e05325/packages/server/src/http/resolveHTTPResponse.ts#L25
      // TODO https://github.com/trpc/trpc/blob/7ad695ea33810a162808c43b6fba1fb920e05325/packages/server/src/http/resolveHTTPResponse.ts#L141-L145
      // https://gist.github.com/OutdatedVersion/8ea31e6790d6514094487e2f76e1b652
      const urlParams = new URL(req.url).searchParams;
      const inputParams = urlParams.get("input");
      const input = inputParams ? JSON.parse(inputParams) : undefined;

      const call = {
        type: "subscription",
        ctx,
        path: context.params.trpc,
        input,
        rawInput: input,
      } as const;

      const res = await procedure(call);
      if (!isObservable(res)) {
        // eslint-disable-next-line @typescript-eslint/no-unsafe-call
        await writer.close();
        throw new Error(`subscription must return observable`);
      }

      // https://github.com/trpc/trpc/blob/7ad695ea33810a162808c43b6fba1fb920e05325/packages/server/src/http/resolveHTTPResponse.ts#L189-L193
      const subscription = res.subscribe({
        next(value) {
          // https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events
          console.log("server subscription next", value);
          void writer.write(`event:data\ndata: ${JSON.stringify(value)}\n\n`);
        },
        error(err) {
          console.log("server subscription error", err);
          void writer.abort(err);
          subscription.unsubscribe();
        },
        complete() {
          console.log("server subscription complete");
          void writer.close();
          subscription.unsubscribe();
        },
      });
      subscription.unsubscribe();

      // req.on("close", () => {
      //   console.log("unsubscribe: req closed");
      //   subscription.unsubscribe();
      // });
      // req.on("end", () => {
      //   console.log("unsubscribe: req end");
      //   subscription.unsubscribe();
      // });
      // req.on("error", () => console.log("req error"));
      // req.on("pause", () => console.log("req paused"));
    } catch (error) {
      // https://github.com/trpc/trpc/blob/7ad695ea33810a162808c43b6fba1fb920e05325/packages/server/src/http/resolveHTTPResponse.ts#L198-L202
      console.error("Uncaught subscription error", error);
      void writer.abort(error);
    }
    return new Response(stream.readable, {
      status: 200,
      headers: {
        Connection: "keep-alive",
        "Cache-Control": "no-cache, no-transform",
        "Content-Type": "text/event-stream;charset=utf-8",
        "Access-Control-Allow-Origin": "*",
      },
    });
  }
  return fetchRequestHandler({
    endpoint: "/api/trpc",
    req,
    router: appRouter,
    createContext: createTRPCContext,
    onError:
      env.NODE_ENV === "development"
        ? ({ path, error }) => {
            console.error(
              `❌ tRPC failed on ${path ?? "<no-path>"}: ${error.message}`
            );
          }
        : undefined,
  });
};
StringKe commented 1 year ago

@OutdatedVersion unsubscription doesn't actually stop the Observable from running, does it?

matannahmani commented 1 year ago

For people who may be interested I managed to make a production-ready version of SSE using app dir route handlers. You can view the code here: https://github.com/matannahmani/ada-trpc-sse-link https://github.com/matannahmani/ada-trpc-sse-link/blob/main/src/trpc/stream-link.ts https://github.com/matannahmani/ada-trpc-sse-link/blob/main/src/trpc/client.ts https://github.com/matannahmani/ada-trpc-sse-link/blob/main/src/app/api/trpc/%5Btrpc%5D/route.ts Or view live here (integration using LangChain and TRPC SSE LINK): https://preview.im-ada.ai

StringKe commented 1 year ago

For people who may be interested I managed to make a production-ready version of SSE using app dir route handlers. You can view the code here: https://github.com/matannahmani/ada-ai-prototype https://github.com/matannahmani/ada-ai-prototype/blob/main/src/trpc/stream-link.ts https://github.com/matannahmani/ada-ai-prototype/blob/main/src/trpc/client.ts https://github.com/matannahmani/ada-ai-prototype/blob/main/src/app/api/trpc/%5Btrpc%5D/route.ts Or test the demo here (It's not perfect but working quite well): https://ada-ai-prototype.vercel.app/

Will SSE continue to run on the back end when the user actively shuts it down?

matannahmani commented 1 year ago

@StringKe to my understanding yes, as I know so far AbortSignal is bugged/doesn't exist on the next 13.4.4 app route API handlers, I also wrote it down on one of the comments in the code. @see https://github.com/vercel/next.js/discussions/48682

marschr commented 1 year ago

For people who may be interested I managed to make a production-ready version of SSE using app dir route handlers. You can view the code here: https://github.com/matannahmani/ada-ai-prototype https://github.com/matannahmani/ada-ai-prototype/blob/main/src/trpc/stream-link.ts https://github.com/matannahmani/ada-ai-prototype/blob/main/src/trpc/client.ts https://github.com/matannahmani/ada-ai-prototype/blob/main/src/app/api/trpc/%5Btrpc%5D/route.ts Or test the demo here (It's not perfect but working quite well): https://ada-ai-prototype.vercel.app/

Hey @matannahmani , I was using your repo implementation of tRPC + SSE as reference for a tRPC + SSE + svelte version, but since a few hours ago the repo is gone and all the links are broken, is there any way so we still have access for it as a reference? Thanks.

matannahmani commented 1 year ago

@marschr for sure, here is a new repo dedicated to SSE-Link, I will try to make some work during this weekend to showcase ways you can use it and ways we are using it (LangChain integration). for now, you can see a snapshot from my earliest stable POC. https://github.com/matannahmani/ada-trpc-sse-link https://github.com/matannahmani/ada-trpc-sse-link/blob/main/src/trpc/stream-link.ts https://github.com/matannahmani/ada-trpc-sse-link/blob/main/src/trpc/client.ts https://github.com/matannahmani/ada-trpc-sse-link/blob/main/src/app/api/trpc/%5Btrpc%5D/route.ts Or view live here (integration using LangChain and TRPC SSE LINK) https://preview.im-ada.ai

on a side note I will try to make a pull request to trpc main repo with The link and docs so people can view in the future for reference instead of having to search the issues every time, I see the progress on the SSELink is halted https://github.com/trpc/trpc/issues/4477

github-actions[bot] commented 3 months ago

This issue has been locked because we are very unlikely to see comments on closed issues. If you are running into a similar issue, please create a new issue. Thank you.