upstash / qstash-js

Message queue for serverless
https://docs.upstash.com/qstash
MIT License
133 stars 11 forks source link

Vercel edge function stream not working with qstash #38

Open nilooy opened 1 year ago

nilooy commented 1 year ago

i am trying to use vercel edge function's streaming with ai sdk (https://sdk.vercel.ai/docs/guides/openai) for long running job(up to 120s) with https://docs.upstash.com/qstash

it works perfectly locally but when deployed on vercel, it doesn't. it just stream for few milli seconds and stops. (no error) i'm not sure exactly where the issue is. At first glance seems like, the issue is on qstash but then it works locally and not in vercel, so it indicates that the issue might be there.

i would love to know any feedback about this approach for fan out jobs with edge function's streaming.

calling this to run queue:

import { baseUrl, qstashClient } from "@/lib/queue/qstash";

export default async function handler(req, res) {
  const job = await qstashClient.publishJSON({
    url: baseUrl + "/api/test/chat",
    // or topic: "the name or id of a topic"
    body: {},
    retries: 0,
  });

  return res.status(200).json({ ok: true });
}

the queue that run as background job

import { Configuration, OpenAIApi } from "openai-edge";
import { OpenAIStream, StreamingTextResponse } from "ai";
import { validateSignatureEdge } from "./validateSignatureEdge";

// Create an OpenAI API client (that's edge friendly!)
const config = new Configuration({
  organization: process.env.OPENAI_ORG,
  apiKey: process.env.OPENAI_API_KEY,
});

const openai = new OpenAIApi(config);

export const runtime = "edge";

async function handler(req: Request) {
  const response = await openai.createChatCompletion({
    model: "gpt-3.5-turbo",
    stream: true,
    messages: [{ role: "user", content: "how next.js works?" }],
  });
  const stream = OpenAIStream(response, {
    onToken: (token) => {
      console.log({ token });
    },
    onStart: () => {
      console.log("started");
    },
  });
  // Respond with the stream
  return new StreamingTextResponse(stream);
}

// it's a custom signature validator of qstash
export default validateSignatureEdge(handler);

validateSignatureEdge.ts

// next.js api wrapper
export const validateSignatureEdge = (handler) => async (req, res) => {
  const receiver = new Receiver({
    currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY,
    nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY,
  });

  const body = await req.text();

  const isValid = receiver.verify({
    signature: req.headers.get("Upstash-Signature")!,
    body: body,
  });
  if (!isValid) {
    return new Response("Invalid signature", { status: 401 });
  }

  req.data = JSON.parse(body);

  return handler(req, res);
};
chronark commented 1 year ago

What is the error?

nilooy commented 1 year ago

What is the error?

there's no error showing up it's just starts and under few ms it stops streaming (locally with same qstash it streams fully)

Screenshot 2023-06-27 at 10 01 29 log ends here

it happens all at the same time, so i doubt if any streaming is actually happening. Screenshot 2023-06-27 at 10 04 26

chronark commented 1 year ago

is it perhaps cached by vercel? they have some really aggressive caching strategies

nilooy commented 1 year ago

is it perhaps cached by vercel? they have some really aggressive caching strategies

i tried redeploying without cache on vercel, but no luck, i guess vercel edge function are not considering qstash as valid client for streaming. i tried without qstash, it ran fine.

Raithfall commented 2 months ago

Same problem here, any solution one year later?