langchain-ai / langchainjs

🦜🔗 Build context-aware reasoning applications 🦜🔗
https://js.langchain.com/docs/
MIT License
12.4k stars 2.1k forks source link

Cannot convert langchain's ReadableStream to node ReadableStream #5845

Closed muditjai closed 2 days ago

muditjai commented 3 months ago

Checked other resources

Example Code

import { RunnableSequence } from "@langchain/core/runnables";
import { ReadableStream } from "node:stream/web";

let answer_stream = ReadableStream.from(["default", "fallback", "stream"]);
if (req.app.locals.llm_chain) {
  const llm_chain: RunnableSequence<string, string> = req.app.locals.llm_chain;
  answer_stream = await llm_chain.stream("Sample question") 
}

Error Message and Stack Trace (if applicable)

Property 'values' is missing in type 'IterableReadableStream<string>' but required in type 'ReadableStream<string>'.ts(2741) web.d.ts(154, 9): 'values' is declared here.

Description

In my business logic, I am trying to extract a stream from langchain, but also have a node ReadableStream as backup in certain cases.

However, it seems langchain's ReadableStream coming from lib.dom.d.ts in typescript/lib is not compatible with node's ReadableStream coming from web.d.ts in node:stream/web . It's missing values method.

Here's the ReadableStream from node:stream/web

 /** This Streams API interface represents a readable stream of byte data. */
interface ReadableStream<R = any> {
        readonly locked: boolean;
        cancel(reason?: any): Promise<void>;
        getReader(): ReadableStreamDefaultReader<R>;
        getReader(options: { mode: "byob" }): ReadableStreamBYOBReader;
        pipeThrough<T>(transform: ReadableWritablePair<T, R>, options?: StreamPipeOptions): ReadableStream<T>;
        pipeTo(destination: WritableStream<R>, options?: StreamPipeOptions): Promise<void>;
        tee(): [ReadableStream<R>, ReadableStream<R>];
        values(options?: { preventCancel?: boolean }): AsyncIterableIterator<R>;
        [Symbol.asyncIterator](): AsyncIterableIterator<R>;
    }

And here's ReadableStream from typescript/lib which langchain depends on -

/**
 * This Streams API interface represents a readable stream of byte data. The Fetch API offers a concrete instance of a ReadableStream through the body property of a Response object.
 *
 * [MDN Reference](https://developer.mozilla.org/docs/Web/API/ReadableStream)
 */
interface ReadableStream<R = any> {
    /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/ReadableStream/locked) */
    readonly locked: boolean;
    /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/ReadableStream/cancel) */
    cancel(reason?: any): Promise<void>;
    /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/ReadableStream/getReader) */
    getReader(options: { mode: "byob" }): ReadableStreamBYOBReader;
    getReader(): ReadableStreamDefaultReader<R>;
    getReader(options?: ReadableStreamGetReaderOptions): ReadableStreamReader<R>;
    /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/ReadableStream/pipeThrough) */
    pipeThrough<T>(transform: ReadableWritablePair<T, R>, options?: StreamPipeOptions): ReadableStream<T>;
    /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/ReadableStream/pipeTo) */
    pipeTo(destination: WritableStream<R>, options?: StreamPipeOptions): Promise<void>;
    /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/ReadableStream/tee) */
    tee(): [ReadableStream<R>, ReadableStream<R>];
}

System Info

node -v v20.12.1

Windows

dosubot[bot] commented 3 months ago

Hello, @muditjai! I'm here to assist with your issue and help you become a contributor.

To resolve the compatibility issue between LangChain's ReadableStream and Node's ReadableStream, you can use the IterableReadableStreamV0 class provided in the @langchain/core/utils/stream file. This class extends the ReadableStream and implements the AsyncIterable interface, ensuring compatibility across different environments.

Here's an example of how you can use the IterableReadableStreamV0 class to wrap a Node ReadableStream:

import { IterableReadableStreamV0 } from '@langchain/core/utils/stream';

// Assuming you have a Node ReadableStream instance
const nodeReadableStream = getNodeReadableStream(); // Replace with your actual Node ReadableStream

// Wrap the Node ReadableStream with IterableReadableStreamV0
const iterableStream = IterableReadableStreamV0.fromReadableStream(nodeReadableStream);

// Now you can use async iteration with the iterableStream
for await (const chunk of iterableStream) {
  console.log(chunk);
}

This approach ensures that the ReadableStream can be used with async iteration syntax, making it compatible with both LangChain and Node environments. Additionally, the convertEventStreamToIterableReadableDataStream function in the @langchain/core/utils/stream file can be used to convert an event stream into an iterable readable data stream, further enhancing compatibility [1][2][3].

To continue talking to Dosu, mention @dosu.

jacoblee93 commented 3 months ago

Thanks for the heads up - will have a look as soon as possible

jacoblee93 commented 3 months ago

Having some trouble figuring this out :(

Is it just the types? Does it work if you just type it as any?

muditjai commented 3 months ago

Thanks for taking a look.

It's both an issue with type conversion and missing support for .values() method.

See the stream.values({ preventCancel: true }) example in https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream, without values() we can't read the stream in 2 separate for loops. In my use case, I need to read first k characters in a loop, do some processing on it and then continue reading rest of stream. Currently I'm implementing this in a while loop using .read() on a reader instance obtained through getReader().

The type conversion I was able to workaround by typing it as any.