Closed slawojstanislawski-appliscale closed 12 months ago
One workaround was to hijack the previous stream and manually modify it, and stop using this experimental feature altogether, good for a temporary fix until it's not needed anymore.
const readableStream = new ReadableStream({
async start(controller) {
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
controller.enqueue(
new TextEncoder().encode(JSON.stringify({ key: "value" })),
);
controller.close();
break;
}
controller.enqueue(value);
}
},
});
return new StreamingTextResponse(readableStream);
It seems to be related to the LangChain expression language and the support for it in the Vercel AI SDK.
Here is a similar example without the expression language:
const model = new ChatOpenAI({
temperature: 0.8,
streaming: true, // important: enable streaming
});
const chain = new LLMChain({ llm: model, prompt, verbose: true });
const data = new experimental_StreamData();
// important: use LangChainStream from the AI SDK:
const { stream, handlers } = LangChainStream({
onFinal: () => {
data.append(JSON.stringify({ key: "value" }));
data.close();
},
experimental_streamData: true,
});
// run async (no await):
chain.stream(
{
chat_history: formattedPreviousMessages.join("\n"),
input: currentMessageContent,
},
{ callbacks: [handlers] }, // attach the handlers from the AI SDK Stream
);
return new StreamingTextResponse(stream, {}, data);
This results in the response streaming into the UI. The data piece is also available. Here is the response preview from the network tab:
0:""
0:"Arr"
0:"r"
0:","
0:" me"
0:" mate"
0:"y"
0:"!"
0:" What"
0:" be"
0:" ye"
0:" test"
0:"in"
0:"'"
0:" today"
0:"?"
0:""
2:"[\"{\\\"key\\\":\\\"value\\\"}\"]"
Here is how to make it work with the LC expression language:
// ...
const data = new experimental_StreamData();
const stream = await chain.stream(
{
chat_history: formattedPreviousMessages.join("\n"),
input: currentMessageContent,
},
{
callbacks: [
{
handleChainEnd(outputs, runId, parentRunId) {
// check that main chain (without parent) is finished:
if (parentRunId == null) {
data.append(JSON.stringify({ key: "value" }));
data.close();
}
},
},
],
},
);
return new StreamingTextResponse(
stream.pipeThrough(createStreamDataTransformer(true)),
{},
data,
);
nothing really worked for me here. building on some ideas from the topic I've came with the following idea :
implementation : 1+2. add extra data to the stream
const readableStream = new ReadableStream({
async start(controller) {
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
controller.enqueue(
new TextEncoder().encode(
`
-- sources:start --
${JSON.stringify(sources)}
-- sources:end --
`,
),
);
controller.close();
break;
}
controller.enqueue(value);
}
},
});
return new StreamingTextResponse(readableStream);
3. extract the message and the extra data on the client:
export const extractMessage = (str: string) => {
const regex = /-- sources:start --(.*)-- sources:end --/s;
const match = regex.exec(str);
const sourceStr = match?.[1];
const sources: { name: string; url: string }[] = JSON.parse(
sourceStr?.trim() || "[]",
);
const answer = str
.replace(regex, "")
.trim();
return { sources, answer };
};
@kaminskypavel which langchain version are you using?
@kaminskypavel which langchain version are you using?
"langchain": "^0.0.193",
@kaminskypavel check out https://github.com/vercel/ai/pull/792/files - I've added a couple of examples
@kaminskypavel check out https://github.com/vercel/ai/pull/792/files - I've added a couple of examples
@lgrammel , thanks for the link. your examples work as intended. but it looks like there's a problem with the Langchain
class.
const chain = RunnableSequence.from([
{
context: new RunnablePassthrough(),
question: (input: { question: string; chatHistory?: string }) =>
input.question,
},
prompt,
llm,
new BytesOutputParser(),
]);
if you hook the handlers
:
await chain.stream(
{
question,
chatHistory,
},
{
callbacks: [
handlers,
],
},
);
onFinal wont get called
const {stream, handlers} = LangChainStream({
onStart: () => {
console.log("stareted...");
},
onFinal: () => {
console.log("finished...");
data.append(JSON.stringify({key: 'value'})); // example
data.close();
},
experimental_streamData: true,
});
@kaminskypavel did you figure out a solution to get onFinal to run?
@kaminskypavel check out https://github.com/vercel/ai/pull/792/files - I've added a couple of examples
@lgrammel , thanks for the link. your examples work as intended. but it looks like there's a problem with the
Langchain
class.const chain = RunnableSequence.from([ { context: new RunnablePassthrough(), question: (input: { question: string; chatHistory?: string }) => input.question, }, prompt, llm, new BytesOutputParser(), ]);
if you hook the
handlers
:await chain.stream( { question, chatHistory, }, { callbacks: [ handlers, ], }, );
onFinal wont get called
const {stream, handlers} = LangChainStream({ onStart: () => { console.log("stareted..."); }, onFinal: () => { console.log("finished..."); data.append(JSON.stringify({key: 'value'})); // example data.close(); }, experimental_streamData: true, });
@kaminskypavel did you figure out a solution to get onFinal to run?
no unfortunately I did not.
Reproduction steps:
git clone git@github.com:langchain-ai/langchain-nextjs-template.git
npm install
.env.local
app/api/chat/route.ts
as follows:StreamingTextResponse, +} from "ai";
import { ChatOpenAI } from "langchain/chat_models/openai"; import { BytesOutputParser } from "langchain/schema/output_parser"; import { PromptTemplate } from "langchain/prompts"; +import { CallbackManager } from "langchain/callbacks"; +import { LLMResult } from "langchain/schema";
export const runtime = "edge";
@@ -61,12 +67,24 @@ export async function POST(req: NextRequest) { */ const chain = prompt.pipe(model).pipe(outputParser);
const stream = await chain.stream({
chat_history: formattedPreviousMessages.join("\n"),
input: currentMessageContent,
});
const data = new experimental_StreamData();
const stream = await chain.stream(
{
chat_history: formattedPreviousMessages.join("\n"),
input: currentMessageContent,
},
{
callbacks: CallbackManager.fromHandlers({
handleLLMEnd(): Promise | void {
data.append(JSON.stringify({ key: "value" }));
data.close();
},
}),
},
);
return new StreamingTextResponse(stream);
return new StreamingTextResponse(stream, {}, data); } catch (e: any) {