sergiodxa / remix-utils

A set of utility functions and types to use with Remix.run
https://sergiodxa.github.io/remix-utils/
MIT License
2.06k stars 116 forks source link

Add new eventStream API for async generators #336

Open sergiodxa opened 5 months ago

sergiodxa commented 5 months ago

The eventStream response helpers is based a lot on how useEffect works, you pass a function and that function returns another one that's used to cleanup any side-effect.

This is great for event emitters like API where you have a function used to subscribe to events and another to unsubscribe, e.g.

return eventStream(request.signal, send => {
  emitter.addEventListener("event", handler)
  return () => emitter.removeEventListener("event", handler);

  function handler(event: Event) {
    send({ data: event });
  }
}

Or another common API is for cases where you get the unsubscribe function, e.g.

return eventStream(request.signal, (send) =>
  emitter.subscribe((event) => {
    send({ data: event });
  }),
);

Both are great, but if you want to iterate an AsyncIterator, you will need to create a function just to have the async code inside it.

return eventStream(request.signal, (send) => {
  run()
  return () => {}; // empty cleanup

  async function run() {
    for await (const chunk of stream) {
      if (request.signal.aborted) return;
      send({ data: chunk });
    }
  }
});

This PR introduce an alternative API that will help with this my splitting the handle and cleanup functions.

let stream = new Stream(); // get your stream here

return eventStream(request.signal, {
  // This function will be called when the connection is closed, it can be an
  // async function or a sync function, depends on your needs.
  cleanup() {
    return stream.close()
  },

  // This function will be called when the connection is open, here you can send
  // data to the browser or close the connection
  async handle(send, close) {
    // iterate the async stream of data
    for await (const chunk of stream) {
      // stop the iteration if the connection is closed
      if (request.signal.aborted) return;
      send({ data: chunk }); // send each chunk
    }

    return close(); // close the connection when the stream ends
  }
});

A more real-world example usage for this is using the OpenAI SDK.

import { client } from "~/services/openai.server";

export async function loader({ request }: LoaderFunctionArgs) {
  let stream = await client.chat.completions.create({
    model: "gpt-4",
    messages: [{ role: "user", content: "Say this is a test" }],
    stream: true,
  });

  return eventStream(controller.signal, {
    cleanup() {
      stream.controller.abort();
    },
    async handle(send, close) {
      for await (const chunk of stream) {
        send({ data: chunk.choices[0]?.delta?.content || "" });
      }

      return close();
    },
  });
}