vercel / ai

Build AI-powered applications with React, Svelte, Vue, and Solid
https://sdk.vercel.ai/docs
Other
10.13k stars 1.51k forks source link

Example using Streaming Response for FastAPI. #161

Closed mattzcarey closed 3 months ago

mattzcarey commented 1 year ago

Lots of people write their Langchain apis in Python, not using RSC.

A common tech stack is using FastAPI on the backend with NextJS/React for the frontend. It would be great to show an example of this using FastAPI Streaming Response.

This would really help us building Quivr..

jasan-s commented 1 year ago

@mattzcarey I'm thinking of using similar tech stack, but it seems that vercel doesn't support python runtime streaming. could you please share your stack in more detail. I'm currently using langchain js b deployed to vercel edge function and streaming response back to client. But it is apparent that the python version is far more featured, thus my reason to switch.

mattzcarey commented 1 year ago

@jasan-s I have managed to do this with langchain callbacks and Streaming Response from FastAPi. You can check out the 'stream' route in the Quivr codebase.

jasan-s commented 1 year ago

@jasan-s I have managed to do this with langchain callbacks and Streaming Response from FastAPi. You can check out the 'stream' route in the Quivr codebase.

Did you deploy quiver to vercel?

mattzcarey commented 1 year ago

@jasan-s I have managed to do this with langchain callbacks and Streaming Response from FastAPi. You can check out the 'stream' route in the Quivr codebase.

Did you deploy quiver to vercel?

Yes it can be.

kallebysantos commented 1 year ago

I Had create a gist example:

https://user-images.githubusercontent.com/105971119/277059459-0109bc03-57a7-493d-bfa3-6152745f3349.mp4

satyamdalai commented 1 year ago

Having a native support for converting streaming responses from FastAPI/any other HTTP Server in Next.js API routes (with the help of SDK) will be helpful in my usecase. Since I don't want to directly call FastAPI endpoint using useChat hook, as I manage the authentication layer in Next.js.

danielcorin commented 9 months ago

I came across this thread looking for the same thing but wanted to use the openai library (rather than langchain as in the gist above) and the useChat hook. Here's what I ended up doing:

server.py

from openai import AsyncOpenAI

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse

app = FastAPI()

# Added because the frontend and this backend run on separate ports, should change depending on your setup, not a good idea in prod
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

client = AsyncOpenAI()

@app.post("/ask")
async def ask(req: dict):
    stream = await client.chat.completions.create(
        messages=req["messages"],
        model="gpt-3.5-turbo",
        stream=True,
    )

    async def generator():
        async for chunk in stream:
            yield chunk.choices[0].delta.content or ""

    response_messages = generator()
    return StreamingResponse(response_messages, media_type="text/event-stream")

Run with

uvicorn server:app --reload

Example frontend src/app/page.tsx in a new Next.js app

"use client";

import { useChat } from "ai/react";

export default function Home() {
  const { messages, input, handleInputChange, handleSubmit } = useChat({
    api: "http://127.0.0.1:8000/ask"
  });

  return (
    <main className="flex min-h-screen flex-col items-center justify-between p-24">
      <div>
        {messages.map((m) => (
          <div key={m.id}>
            {m.role === "user" ? "User: " : "AI: "}
            {m.content}
          </div>
        ))}

        <form onSubmit={handleSubmit}>
          <label>
            Say something...
            <input value={input} onChange={handleInputChange} />
          </label>
          <button type="submit">Send</button>
        </form>
      </div>
    </main>
  );
}
kallebysantos commented 9 months ago

I think that Issue should be mark as complete. We had provide useful examples that solves the question.

DanLeininger commented 9 months ago

Building off the above answers, here's an example using experimental_StreamData:

server.py

from openai import AsyncOpenAI

from utils import stream_chunk #formats chunks for use with experimental_StreamData

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse

app = FastAPI()

# Added because the frontend and this backend run on separate ports, should change depending on your setup, not a good idea in prod
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
    expose_headers=[ "X-Experimental-Stream-Data"],  # this is needed for streaming data header to be read by the client
)

client = AsyncOpenAI()

@app.post("/ask")
async def ask(req: dict):
    stream = await client.chat.completions.create(
        messages=req["messages"],
        model="gpt-3.5-turbo",
        stream=True,
    )

    async def generator():
        async for chunk in stream:
            yield stream_chunk(chunk.choices[0].delta.content or "", "text")
        yield stream_chunk([{"foo":"bar"}], "data") # send streaming data after 

    response_messages = generator()
    return StreamingResponse(response_messages, media_type="text/event-stream",  headers={"X-Experimental-Stream-Data": "true"})

Where stream_chunk is a util that looks like this:

utils.py

# transforms the chunk into a stream part compatible with the vercel/ai
def stream_chunk(chunk, type: str = "text"):
    code = get_stream_part_code(type)
    formatted_stream_part = f"{code}:{json.dumps(chunk, separators=(',', ':'))}\n"
    return formatted_stream_part

# given a type returns the code for the stream part
def get_stream_part_code(stream_part_type: str) -> str:
    stream_part_types = {
        "text": "0",
        "function_call": "1",
        "data": "2",
        "error": "3",
        "assistant_message": "4",
        "assistant_data_stream_part": "5",
        "data_stream_part": "6",
        "message_annotations_stream_part": "7",
    }
    return stream_part_types[stream_part_type]
szymonzmyslony commented 9 months ago

@DanLeininger your setup works for me when using useChat(). I want to add some custom onCompletion handlers with AI stream in route handler. My server setup is exactly like yours (again works with useChat) but im getting no response with:


export async function POST(req: Request) {
const json = await req.json()
const { messages, previewToken } = json
const userId = (await auth())?.user.id

if (!userId) {
return new Response('Unauthorized', {
status: 401
})
}
const data = {
messages: [{ role: 'user', content: 'Hello' }]
}
const fetchResponse = await fetch('http://127.0.0.1:8000/ask', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
})
const reader = fetchResponse
console.log('Reader is', reader)
const myStream = AIStream(reader, undefined, {
onStart: async () => {
console.log('Stream started')
},
onCompletion: async (completion: string) => {
console.log('Completion completed', completion)
},
onFinal: async (completion: string) => {
console.log('Stream completed', completion)
}
})
return new StreamingTextResponse(myStream)
}
Udbhav8 commented 9 months ago

@DanLeininger your setup works for me when using useChat(). I want to add some custom onCompletion handlers with AI stream in route handler. My server setup is exactly like yours (again works with useChat) but im getting no response with:


export async function POST(req: Request) {
const json = await req.json()
const { messages, previewToken } = json
const userId = (await auth())?.user.id

if (!userId) {
return new Response('Unauthorized', {
status: 401
})
}
const data = {
messages: [{ role: 'user', content: 'Hello' }]
}
const fetchResponse = await fetch('http://127.0.0.1:8000/ask', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
})
const reader = fetchResponse
console.log('Reader is', reader)
const myStream = AIStream(reader, undefined, {
onStart: async () => {
console.log('Stream started')
},
onCompletion: async (completion: string) => {
console.log('Completion completed', completion)
},
onFinal: async (completion: string) => {
console.log('Stream completed', completion)
}
})
return new StreamingTextResponse(myStream)
}

Having the same issue @danielcorin @DanLeininger would be great to have some help

ichitaka commented 9 months ago

I think that Issue should be mark as complete. We had provide useful examples that solves the question.

We still need a useful example that include tool-calling and streaming data.

DanLeininger commented 9 months ago

@szymonzmyslony @Udbhav8 In our use case we're bypassing Next.js api routes / route handlers and streaming from Fast API directly to the client / useChat() and so haven't attempted passing anything through AIStream

ErikDale commented 7 months ago

@szymonzmyslony @Udbhav8 @satyamdalai have you found out how to add some custom onCompletion handlers with AI stream in the route handler, maybe using the AIStream?

lgrammel commented 6 months ago

If your endpoint sends a chunked text stream, you can useCompletion and useChat with streamMode: "text"

ashen007 commented 5 months ago

@DanLeininger your answer worked for me, my use case was that I had a fast API back end which used langgraph agent and had to do the streaming as you mentioned. it worked properly, thank you!

yachty66 commented 4 months ago

I came across this thread looking for the same thing but wanted to use the openai library (rather than langchain as in the gist above) and the useChat hook. Here's what I ended up doing:

server.py

from openai import AsyncOpenAI

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse

app = FastAPI()

# Added because the frontend and this backend run on separate ports, should change depending on your setup, not a good idea in prod
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

client = AsyncOpenAI()

@app.post("/ask")
async def ask(req: dict):
    stream = await client.chat.completions.create(
        messages=req["messages"],
        model="gpt-3.5-turbo",
        stream=True,
    )

    async def generator():
        async for chunk in stream:
            yield chunk.choices[0].delta.content or ""

    response_messages = generator()
    return StreamingResponse(response_messages, media_type="text/event-stream")

Run with

uvicorn server:app --reload

Example frontend src/app/page.tsx in a new Next.js app

"use client";

import { useChat } from "ai/react";

export default function Home() {
  const { messages, input, handleInputChange, handleSubmit } = useChat({
    api: "http://127.0.0.1:8000/ask"
  });

  return (
    <main className="flex min-h-screen flex-col items-center justify-between p-24">
      <div>
        {messages.map((m) => (
          <div key={m.id}>
            {m.role === "user" ? "User: " : "AI: "}
            {m.content}
          </div>
        ))}

        <form onSubmit={handleSubmit}>
          <label>
            Say something...
            <input value={input} onChange={handleInputChange} />
          </label>
          <button type="submit">Send</button>
        </form>
      </div>
    </main>
  );
}

Is this working in production on Vercel? @danielcorin

chrris99 commented 4 months ago

Has anyone managed streaming tool calls (eg pydantic model) result to useChat or useObject hook?

maxdata commented 4 months ago

Has anyone managed streaming tool calls (eg pydantic model) result to useChat or useObject hook?

https://github.com/virattt/financial-agent-ui/blob/main/frontend/src/app/action.tsx

jeremyphilemon commented 3 months ago

Hey guys, we have introduced Stream Protocols that help you develop custom backends and frontends for your use case, e.g., to provide compatible API endpoints that are implemented in a different language such as Python.

You can check out the newly added example that uses FastAPI as a backend in an application that uses Next.js and the useChat hook.

olivergom commented 3 months ago

Hey guys, we have introduced Stream Protocols that help you develop custom backends and frontends for your use case, e.g., to provide compatible API endpoints that are implemented in a different language such as Python.

You can check out the newly added example that uses FastAPI as a backend in an application that uses Next.js and the useChat hook.

@jeremyphilemon I have been trying to use protocols to get my app streaming on deployments but nothing is working. I am using the fastapi examples and while they work locally, they don't on production. What am I doing wrong?

This is my src/app/simple/page.tsx

"use client";
export const runtime = 'edge';
export const dynamic = 'force-dynamic'; // always run dynamically

import { useChat } from 'ai/react';
import { unstable_noStore as noStore } from 'next/cache';

export default function Page() {
    noStore();
  const { messages, input, handleSubmit, handleInputChange, isLoading } =
    useChat({
      api: '/api/chat?protocol=text',
      streamProtocol: 'text',
      headers: {
        'Content-Type': 'application/json'
      },
      body: { thread_id: "" },
    });

    const handleSubmitWithQuery = async (e: React.FormEvent<HTMLFormElement>) => {
        e.preventDefault();
        handleSubmit(e, {
          body: { query: input, thread_id: "" }
        });
      };

  return (
    <div className="flex flex-col gap-2">
      <div className="flex flex-col p-4 gap-2">
        {messages.map(message => (
          <div key={message.id} className="flex flex-row gap-2">
            <div className="w-24 text-zinc-500 flex-shrink-0">{`${message.role}: `}</div>
            <div className="flex flex-col gap-2">{message.content}</div>
          </div>
        ))}
      </div>
      <form
        onSubmit={handleSubmitWithQuery}
        className="flex flex-col fixed bottom-0 w-full border-t"
      >
        <input
          value={input}
          placeholder="Why is the sky blue?"
          onChange={handleInputChange}
          className="w-full p-4 outline-none bg-transparent"
          disabled={isLoading}
        />
      </form>
    </div>
  );
}

And this is my api endpoint in my api/folder:

class ClientMessage(BaseModel):
    role: str
    content: str

class Request(BaseModel):
    messages: List[ClientMessage]

def stream_text(messages: List[ClientMessage]):
    stream = client.chat.completions.create(
        messages=[{"role": msg.role, "content": msg.content} for msg in messages],
        model="gpt-4",
        stream=True,
    )

    for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            yield chunk.choices[0].delta.content

@app.post("/api/chat")
async def handle_chat(request: Request, protocol: str = Query('text')):
    response = StreamingResponse(stream_text(request.messages), media_type="text/plain")
    return response

I have enabled streaming on all function using the env variables in vercel but still nothing seems to enable streaming on deployments

villesau commented 3 months ago

@jeremyphilemon Looks like streamProtocol: 'text' is not working with useChat unfortunately. messages array will always be empty in that case.

lgrammel commented 3 months ago

@villesau can you double check your setup? i just verified on the latest main and it works as expected.

lgrammel commented 3 months ago

@olivergom this might be a current deployment limitation. we will inform the corresponding team

hodge-jai commented 3 months ago

I understand this issue has been closed, but could someone help me configure this for use with the Anthropic Python SDK? It works fine using the streamProtocol: 'text' option, but doesn't render the response on the front end when using the data stream option. I'm pretty sure I've configured each response according to the specification laid out here Stream Protocol. Interestingly enough, the stream finishes and I can see the full response in the network tab, but useChat doesn't want to append the completed message to the messages array. I would like to be able to pass additional data back along with the text response.


    message = await client.messages.create(
        model="claude-3-5-sonnet-20240620",
        max_tokens=4096,
        messages=messages,
        tools=[bm25_retrieval_tool, embedding_retrieval_tool],
        stream=True,
    )

    async def generator():
        async for chunk in message:
            print(chunk)
            if chunk.type == "message_start" or chunk.type == "content_block_start":
                continue
            if chunk.type == "content_block_delta":
                yield '0:"{text}"\n'.format(text=chunk.delta.text)

        yield 'd:{{"finishReason":"{reason}","usage":{{"promptTokens":{prompt},"completionTokens":{completion}}}}}\n'.format(
            reason="stop",
            prompt=prompt,
            completion=completion,
        )

    response = StreamingResponse(
        generator(),
        headers={"x-vercel-ai-data-stream": "v1"},
        media_type="application/json",
    )

    return response
lgrammel commented 2 months ago

@hodge-jai your finish part looks wrong (double {{ )

hodge-jai commented 2 months ago

@hodge-jai your finish part looks wrong (double {{ )

Updated it to a basic yield 'd:{"finishReason":"stop","usage":{"promptTokens":10,"completionTokens":10}}\n' (token counts are dummy values), and still no dice. I'm wondering if the media_type is wrong for the StreamingResponse? I've tried text/event-stream as well the for the media type with no luck.

Regardless, even if the finish reason isn't working, I should still see the message being rendered on my frontend as the data comes in, correct @lgrammel ?

hodge-jai commented 2 months ago

Update: I've figured it out. The JSON in the data stream chunks wasn't be parsed correctly. I was able to fix the issue by re-encoding the chunk received from the Anthropic client.

 if chunk.type == "content_block_delta":
    yield '0:"{text}"\n'.format(text=chunk.delta.text)

to

if chunk.type == "content_block_delta":
    part = '0:{text}\n'.format(text=json.dumps(chunk.delta.text))
    yield part
amazingnerd commented 5 days ago

Is there anyone know a boilerplate which is using: FastAPI as backend with LangGraph, Nextjs and Postgres, Qdrant as vector database. I want to set up a project like this Morphic but using that tech stack for creating a combination RAG + Finetune AI product: https://vercel.com/templates/next.js/morphic-ai-answer-engine-generative-ui