langchain-ai / langchainjs

🦜🔗 Build context-aware reasoning applications 🦜🔗
https://js.langchain.com/docs/
MIT License
12.42k stars 2.1k forks source link

Streaming does not work if RemoteRunnable is part of RunnableSequence #4811

Closed jumski closed 6 months ago

jumski commented 6 months ago

Checked other resources

Example Code

Langserve endpoint

from fastapi import FastAPI
from langserve import add_routes

app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
    expose_headers=["*"],
)

OPENAI_API_KEY= "your API key"

add_routes(app, ChatOpenAI(model="gpt-3.5-turbo-1106", api_key=OPENAI_API_KEY), path='/models/ChatOpenAI')

if __name__ == "__main__":
    import os
    import uvicorn

    HTTP_PORT = 8081
    uvicorn.run(app, host="0.0.0.0", port=HTTP_PORT)

Python's RemoteRunnable streams correctly

from langserve.client import RemoteRunnable;
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
model = RemoteRunnable("http://localhost:8081/models/ChatOpenAI")
chain = prompt.pipe(model)

stream = chain.stream({'topic': 'cats'})
for chunk in stream:
    print(chunk)

JS equivalent does not stream and just calls invoke

import { ChatPromptTemplate } from '@langchain/core/prompts';
import { RemoteRunnable } from "langchain/runnables/remote";

const prompt = ChatPromptTemplate.fromTemplate("Tell me a joke about {topic}")
const model = new RemoteRunnable({ url: `http://localhost:8081/models/ChatOpenAI` });
const chain = RunnableSequence.from([prompt, model]);

const stream = chain.stream({topic: "cats"});

for await (const chunk of stream) {
  console.log(chunk);
}

Solo RemoteRunnable in JS streams properly

When we simplify the chain and just call .stream directly on RemoteRunnable, streaming seems to be working correctly:

import { RemoteRunnable } from "langchain/runnables/remote";

const chain = new RemoteRunnable({ url: `http://localhost:8081/models/ChatOpenAI` });

const stream = chain.stream({topic: "cats"});

for await (const chunk of stream) {
  console.log(chunk);
}

Error Message and Stack Trace (if applicable)

No response

Description

I'm trying to use RemoteRunnable in place of a model in my javascript LCEL chains but when i compose the RemoteRunnable with any other runnable to form a RunnableSequence, it does not stream and fallbacks to invoke.

Equivalent code works in python version.

System Info

pnpm show langchain

❯ pnpm show langchain

langchain@0.1.28 | MIT | deps: 18 | versions: 259
Typescript bindings for langchain
https://github.com/langchain-ai/langchainjs/tree/main/langchain/

keywords: llm, ai, gpt3, chain, prompt, prompt engineering, chatgpt, machine learning, ml, openai, embeddings, vectorstores

dist
.tarball: https://registry.npmjs.org/langchain/-/langchain-0.1.28.tgz
.shasum: fd07a9880a68865e5c803dc01decdaa097b7c756
.integrity: sha512-LNuILtOSFptfOqDwUwQAl7B0dYrBtxzVZ6lDhc5DA8rk+pyTCy12HQO7t8bGqC8oQlEPYPKaom8VRlvfupUBTw==
.unpackedSize: 4.1 MB

dependencies:
@anthropic-ai/sdk: ^0.9.1     expr-eval: ^2.0.2             langsmith: ~0.1.7             yaml: ^2.2.1
@langchain/community: ~0.0.36 js-tiktoken: ^1.0.7           ml-distance: ^4.0.0           zod-to-json-schema: ^3.22.3
@langchain/core: ~0.1.44      js-yaml: ^4.1.0               openapi-types: ^12.1.3        zod: ^3.22.4
@langchain/openai: ~0.0.19    jsonpointer: ^5.0.1           p-retry: 4
binary-extensions: ^2.2.0     langchainhub: ~0.0.8          uuid: ^9.0.0

maintainers:
- nfcampos <nuno@boringbits.io>
- jacoblee93 <jacoblee93@gmail.com>
- davidduong <david@duong.cz>
- hwchase17 <hw.chase.17@gmail.com>
- basproul <braceasproul@gmail.com>

dist-tags:
latest: 0.1.28     next: 0.1.29-rc.0

published 3 days ago by jacoblee93 <jacoblee93@gmail.com>

Node version

❯ node --version
v21.5.0

pnpm version

❯ pnpm --version
8.14.1
dosubot[bot] commented 6 months ago

Based on the code and the issue you've described, it seems like the problem lies in how RunnableSequence handles streaming. In the current implementation of LangChain.js, RunnableSequence and similar constructs like RunnableRetry, RunnableWithFallbacks, etc., are designed to handle sequences of operations, potentially modifying the behavior of individual Runnable instances through decoration. These constructs focus on invoking runnables in a specific order or under specific conditions, primarily through the invoke method, and optionally through the batch method for handling multiple inputs at once.

The stream method implementation provided in the base class (from which other runnables presumably inherit) is designed to work with individual Runnable instances. It leverages an asynchronous generator (_streamIterator) to yield results as they become available. This mechanism is inherently suited for operations that can produce output incrementally before the entire input has been processed, which is a typical scenario in streaming.

However, when a RemoteRunnable is incorporated into a RunnableSequence, the sequence's invoke method orchestrates the execution flow across multiple runnables, potentially altering inputs and outputs along the way. This orchestration does not inherently support streaming because it focuses on completing each step fully before moving to the next. The sequence's design does not account for the possibility of incrementally yielding results from intermediate steps, which is essential for streaming functionality.

Moreover, the RunnableSequence and similar constructs do not override or provide their own implementations of the stream method that would allow them to handle streaming appropriately across all constituent runnables. Without such an implementation, attempting to stream through a RunnableSequence would default to the base class's streaming behavior, which is not designed to handle the complexities of a sequence of operations.

In summary, the core issue is that the RunnableSequence and related constructs are not designed with streaming in mind, focusing instead on sequential or conditional execution of runnables. To enable streaming functionality when RemoteRunnable is part of a RunnableSequence, it would be necessary to implement a custom stream method for RunnableSequence that orchestrates streaming across all constituent runnables, respecting the incremental nature of streaming and ensuring that intermediate results can be yielded as they become available.

You can find more details about the current implementation of RunnableSequence and RemoteRunnable in the LangChain.js codebase here and here respectively.

I hope this explanation helps clarify the issue. If you have any further questions or need more clarification, please don't hesitate to ask.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.
jumski commented 6 months ago

This is not explained I ja docs nor there is a warning about this behaviour.

From my understanding this is a bug because docs mention that streaming should always be possible after last non-streaming component. Is there really a strong reason for this limitation?

Why python version works without a problem?

jumski commented 6 months ago

Cc @dosu-bot

dosubot[bot] commented 6 months ago

Based on the code you've provided and the context of your question, it seems like the issue you're encountering is due to how RunnableSequence handles the execution of each runnable in the sequence. In the current implementation, each runnable in the sequence is processed synchronously, meaning that even if the last runnable in the sequence (in this case, RemoteRunnable) supports streaming, the sequence itself does not inherently support streaming the output of that last runnable. The results are being collected and returned only after the entire sequence has been processed.

This behavior is different from the Python version due to the nuances of asynchronous execution and generator handling in the two languages. Python's asyncio and generator capabilities might be leveraged in such a way that allows the equivalent Python code to stream outputs from a sequence more seamlessly.

To address this issue in the TypeScript implementation, modifications would need to be made to the RunnableSequence class to support streaming from the last runnable if it supports streaming. This could involve checking if the last runnable in the sequence supports streaming and, if so, handling its output differently by yielding results as they become available instead of waiting for all runnables in the sequence to complete.

However, without specific details on the implementation of RemoteRunnable and how it is expected to integrate with streaming in the sequence, it's challenging to provide a precise solution. It's also important to note that such changes could have significant implications on the overall behavior of the RunnableSequence class and would need to be thoroughly tested to ensure they don't introduce new issues.

I hope this explanation helps clarify the issue. If you have further questions or need more details, please let me know.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.
jumski commented 6 months ago

@dosu-bot point me to resources about the differences in how python and JS handles this streaming differences and if possible to any discussion that explains this decisions or problems

dosubot[bot] commented 6 months ago

Thank you for providing the detailed information. It seems like you're encountering an issue with the RemoteRunnable when it's part of a RunnableSequence in LangChain.js. I'll try to provide some insights based on your questions.

Firstly, regarding the differences between Python and JavaScript in handling asynchronous execution and generator handling, you're correct that these differences might be contributing to the issue you're facing. In JavaScript, the RemoteRunnable class handles asynchronous execution primarily through its invoke, batch, stream, and streamLog methods, which are all asynchronous and return promises. This design allows for non-blocking execution and is a common pattern in JavaScript for handling asynchronous operations, such as network requests. The class makes extensive use of async/await syntax for managing asynchronous code in a more synchronous-looking manner, which simplifies the handling of asynchronous operations like HTTP requests.

For generator handling, the streamLog method is an asynchronous generator function, indicated by the async and * keywords. This method leverages the for await...of loop to asynchronously iterate over a stream of log patches. Asynchronous generators in JavaScript are used to handle streams of data that arrive over time, allowing the function to yield a new value with yield as soon as it becomes available, making it well-suited for streaming data scenarios like log streams.

Compared to Python, JavaScript's approach to asynchronous execution and generator handling is quite different due to the language's inherent design and its single-threaded, event-driven nature. Python uses asyncio for asynchronous programming, which also involves an event loop but is structured differently, often using async def for asynchronous functions and await for waiting on their results. Python's generator functions use the yield keyword, similar to JavaScript, but the handling of asynchronous generators in Python is distinct and involves the async for syntax to iterate over asynchronous generator objects.

Secondly, regarding the RunnableSequence class in LangChain.js, it handles the execution of a sequence of Runnable objects by sequentially invoking each Runnable in the sequence with the output of the previous Runnable as its input. This process starts with the first Runnable in the sequence, continues through any middle Runnable objects, and ends with the last Runnable. The class supports modifying the behavior of the sequence through methods like withRetry, withConfig, and withFallbacks, allowing for retries, configuration binding, and fallback execution, respectively.

For a RemoteRunnable, or any Runnable in the sequence, the handling mechanism does not change. Each Runnable is invoked with the input it receives, which could be the original input for the first Runnable or the output from the previous Runnable in the sequence. The RunnableSequence class ensures that the execution flow is maintained, and any specific behaviors like retries or fallbacks are applied based on the configurations provided.

Unfortunately, I wasn't able to find specific discussions or documentation that explain the design decisions or problems related to this issue in LangChain.js. Also, I couldn't find any known issues or limitations with the RemoteRunnable or RunnableSequence classes in the LangChain.js codebase that could be causing this issue. It's possible that the issue you're facing is due to some other factors or a bug in the codebase.

I hope this information helps you understand the potential differences and behaviors of the RemoteRunnable and RunnableSequence classes in LangChain.js. If you have more information or if there's anything else you'd like to know, please feel free to share.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.
jumski commented 6 months ago

@dosu-bot help me understand how RunnableSequence._streamIterator async generator function works exacly and what prevents it from streaming the last step instead of invoking it

dosubot[bot] commented 6 months ago

The RunnableSequence class in LangChain.js is designed to execute each step in the sequence synchronously, waiting for each step to complete before moving to the next. This design does not inherently support streaming, which requires a continuous flow of data that can be processed and passed along as it becomes available.

To modify the RunnableSequence class to support streaming from the last runnable, you would need to implement a mechanism that allows the last runnable to start producing output as soon as it has enough input to do so, without waiting for all previous runnables to complete their execution. This could involve:

  1. Overriding the stream method in the RunnableSequence class: You would need to create a custom implementation of the stream method that specifically handles the streaming capability of the last runnable. This method would need to orchestrate the sequence such that when it reaches the last runnable, it switches from a sequential execution model to a streaming model.

  2. Integrating streaming support into the sequence execution logic: This would involve modifying the sequence execution logic in the invoke method to detect when it's executing the last runnable and, instead of simply awaiting its completion, setting up a streaming process. This could be achieved by checking if the last runnable implements a specific streaming interface or method and, if so, calling that method to initiate streaming.

  3. Handling intermediate outputs: For the last runnable to start streaming effectively, it may need partial outputs from the previous runnables. This would require a mechanism to pass these intermediate outputs to the last runnable as soon as they are available, rather than waiting for the entire sequence to execute. This could involve modifying the execution logic of the sequence to support partial outputs and passing them to the next runnable in the sequence.

Here is a conceptual example of how you might start to implement these changes:

// In the RunnableSequence class

// Override the stream method to support streaming from the last runnable
async stream(
  input: RunInput,
  options?: Partial<CallOptions>
): Promise<IterableReadableStream<RunOutput>> {
  // Execute all runnables in the sequence except the last one synchronously
  let intermediateInput = input;
  for (let i = 0; i < this.steps.length - 1; i++) {
    const step = this.steps[i];
    intermediateInput = await step.invoke(intermediateInput, options);
  }

  // Check if the last runnable supports streaming and initiate streaming
  const lastRunnable = this.steps[this.steps.length - 1];
  if (typeof lastRunnable.stream === 'function') {
    // Directly return the stream from the last runnable
    return lastRunnable.stream(intermediateInput, options);
  } else {
    // Fallback to default behavior if the last runnable does not support streaming
    return super.stream(input, options);
  }
}

This example outlines a basic approach to modifying the RunnableSequence class to support streaming from the last runnable. It involves executing all but the last runnable in the sequence synchronously, then checking if the last runnable supports streaming and, if so, initiating the stream. This approach assumes that the last runnable has a stream method implemented that can handle streaming output. Further modifications and error handling would likely be necessary to fully integrate this functionality into the LangChain.js framework.

Please note that this is a conceptual example and might not work out of the box. You might need to adjust it according to your specific use case and the exact implementation of the RunnableSequence and RemoteRunnable classes in your version of LangChain.js.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.
jumski commented 6 months ago

@dosu-bot i think you are mistaken and this behaviour is already implemented, see RunnableSequence._streamIterator (https://github.com/langchain-ai/langchainjs/blob/main/langchain-core/src/runnables/base.ts#L1558)

    async *_streamIterator(input, options) {
        const callbackManager_ = await getCallbackManagerForConfig(options);
        const runManager = await callbackManager_?.handleChainStart(this.toJSON(), _coerceToDict(input, "input"), undefined, undefined, undefined, undefined, options?.runName);
        const steps = [this.first, ...this.middle, this.last];
        let concatSupported = true;
        let finalOutput;
        async function* inputGenerator() {
            yield input;
        }
        try {
            let finalGenerator = steps[0].transform(inputGenerator(), patchConfig(options, {
                callbacks: runManager?.getChild(`seq:step:1`),
            }));
            for (let i = 1; i < steps.length; i += 1) {
                const step = steps[i];
                finalGenerator = await step.transform(finalGenerator, patchConfig(options, {
                    callbacks: runManager?.getChild(`seq:step:${i + 1}`),
                }));
            }
            for await (const chunk of finalGenerator) {
                yield chunk;
                if (concatSupported) {
                    if (finalOutput === undefined) {
                        finalOutput = chunk;
                    }
                    else {
                        try {
                            // eslint-disable-next-line @typescript-eslint/no-explicit-any
                            finalOutput = concat(finalOutput, chunk);
                        }
                        catch (e) {
                            finalOutput = undefined;
                            concatSupported = false;
                        }
                    }
                }
            }
        }
        catch (e) {
            await runManager?.handleChainError(e);
            throw e;
        }
        await runManager?.handleChainEnd(_coerceToDict(finalOutput, "output"));
    }
dosubot[bot] commented 6 months ago

The _streamIterator async generator function in the RunnableSequence class is designed to provide a default streaming implementation for runnables. This function takes an input and optional call options, invokes the runnable with these parameters, and yields the result. This is a simplistic implementation meant to be overridden by subclasses that support more sophisticated streaming output. The purpose of this function is to allow for the streaming of outputs, potentially in real-time or in chunks, rather than waiting for all processing to complete before returning a result. This can be particularly useful for long-running tasks or those that generate incremental outputs.

The reason why _streamIterator might be invoking the last step instead of streaming it when a RemoteRunnable is part of a RunnableSequence could be related to how the RunnableSequence class's invoke method is implemented. The invoke method processes each step in the sequence sequentially, starting with the first step, then any middle steps, and finally the last step. Each step's output is used as the input for the next step. The final step's output is then returned as the final output of the sequence.

When a RemoteRunnable is included as part of a RunnableSequence, and the sequence is streamed using _streamIterator, the streaming behavior is determined by the default implementation of _streamIterator, which simply invokes the runnable (in this case, the entire sequence) once and yields the result. This means that the streaming functionality does not inherently break down the sequence into its constituent steps for individual streaming. Instead, it treats the sequence as a single runnable entity. Therefore, the streaming operation waits for the entire sequence to complete, including the invocation of the last step, before yielding the final result. This behavior effectively bypasses the potential for streaming intermediate outputs from each step within the sequence.

To achieve streaming of each step within a RunnableSequence, including when a RemoteRunnable is involved, the RunnableSequence class or the RemoteRunnable class would need to implement a more sophisticated streaming mechanism. This could involve overriding the _streamIterator method to handle the streaming of outputs from each step within the sequence individually, rather than treating the sequence as a single invocation. However, based on the provided context, such an implementation is not present, leading to the observed behavior where the last step is invoked and its result is streamed as a single output, rather than streaming the outputs of each step incrementally.

For more details, you can refer to the source code of the RunnableSequence class and the RemoteRunnable class in the LangChain JS codebase:

I hope this explanation helps! If you have any further questions, feel free to ask.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.
jumski commented 6 months ago

@dosu-bot from my understanding the code i pasted for _streamIterator is already supporting streaming of the last step. see:

  1. it assembles the finalGenerator by transforming steps one by one
  2. it then uses finalGenerator to yield chunk by chunk

how you figured out that _streamIterator that i pasted invokes the last step without streaming/chunking?

dosubot[bot] commented 6 months ago

Yes, the _streamIterator async generator function in the RunnableSequence class of the LangChain.js framework does support streaming of the last step. This is evident from the way the function is implemented.

The function first constructs the finalGenerator by transforming each step one by one, starting from the first step. This is done inside the loop:

for (let i = 1; i < steps.length; i += 1) {
    const step = steps[i];
    finalGenerator = await step.transform(finalGenerator, patchConfig(options, {
        callbacks: runManager?.getChild(`seq:step:${i + 1}`),
    }));
}

After the loop, the finalGenerator is fully assembled, including the transformation of the last step. The function then starts to yield chunks from the finalGenerator:

for await (const chunk of finalGenerator) {
    yield chunk;
    ...
}

This means that the finalGenerator will yield chunks of data as they are produced by the last step. Therefore, the _streamIterator function does support streaming of the last step.

Regarding the getCallbackManagerForConfig function, without additional context, it's hard to provide a detailed explanation. However, based on its usage in the code snippet you provided, it seems to be used for managing callbacks related to the execution of the sequence. The runManager obtained from getCallbackManagerForConfig is used to handle the start and end of the sequence execution, as well as any errors that occur during the execution.

I hope this helps! If you have any more questions, feel free to ask.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.
jumski commented 6 months ago

@dosubot What could prevent the final step step from being streamed? Let's say last step is RemoteRunnable which supports steaming. I would expect it to call /stream endpoint but it calls /invoke

jacoblee93 commented 6 months ago

Oh that's very interesting - I hadn't really considered people would use it this way but it makes sense.

We just need to implement some methods, yes. Thanks for reporting!

jumski commented 6 months ago

yeah it makes total sense to implement whole chains in JS and use RemoteRunnable as model so it can be authenticated.

@jacoblee93 is it a big overhaul? maybe i could help with that. what methods are missing? would appreciate pointing me into proper direction - i am really struggling to figure out which part of this code decides about /invoke vs /stream for the last final step

also - temporary workaround would be to just manually call the RemoteRunnable.stream with gathered input?

thanks for getting back :bow:

jumski commented 5 months ago

@dosubot is it possible that streamEvents not properly streaming is something to do with this issue?

kikohs commented 4 months ago

I have this issue in an AgentExecutor, it doesn't stream nor works with streamEvents.

Here is the test code:

  model = new RemoteRunnable({url: 'http://localhost:8000/openai'});

  const prompt = chatStarter('You are helpful');

  const memory = new BufferMemory({
    returnMessages: true,
    inputKey: "input",
    outputKey: "output",
    memoryKey: "chat_history",
  });

  const customTool = new DynamicTool({
    name: "get_word_length",
    description: "Returns the length of a word.",
    func: async (input) => input.length.toString(),
  });

  const tools = [customTool];

  const modelWithFunctions = model.bind({
    functions: tools.map((tool) => convertToOpenAIFunction(tool)),
  });

  const agentWithMemory = RunnableSequence.from([
    {
      input: (i) => i.input,
      agent_scratchpad: (i) => formatToOpenAIFunctionMessages(i.steps),
      chat_history: (i) => i.chat_history,
    },
    prompt,
    modelWithFunctions,
    new OpenAIFunctionsAgentOutputParser(),
  ]);

  const agentExecutor = AgentExecutor.fromAgentAndTools({
    agent: agentWithMemory,
    tools,
    returnIntermediateSteps: true,
  }).withConfig({ runName: "Agent" });

  const input1 = "write a very short story about me and count how many characters?";
  let inputs = {
    input: input1,
    chat_history: await memory.chatHistory.getMessages(),
  }

  // Does not work if RemoteRunnable is used
  const eventStream = await agentExecutor.streamEvents(
    inputs,
    { version: "v1" }
  );

  let output = null;
  for await (const event of eventStream) {
    const eventType = event.event;
    if (eventType === "on_chain_start") {
      if (event.name === "Agent") {
        console.log("\n-----");
        console.log(
          `Starting agent: ${event.name} with input: ${JSON.stringify(
            event.data.input
          )}`
        );
      }
    } else if (eventType === "on_chain_end") {
      if (event.name === "Agent") {
        console.log("\n-----");
        console.log(`Finished agent: ${event.name}\n`);
        console.log(`Agent output was: ${event.data.output}`);
        console.log("\n-----");
        output = event.data.output;
      }
    } else if (eventType === "on_llm_stream") {
      const content = event.data?.chunk?.message?.content;
      if (content !== undefined && content !== "") {
        console.log(`| ${content}`);
      }
    } else if (eventType === "on_tool_start") {
      console.log("\n-----");
      console.log(
        `Starting tool: ${event.name} with inputs: ${event.data.input}`
      );
    } else if (eventType === "on_tool_end") {
      console.log("\n-----");
      console.log(`Finished tool: ${event.name}\n`);
      console.log(`Tool output was: ${event.data.output}`);
      console.log("\n-----");
    }
  }

  // Does not work if RemoteRunnable is used, get everything at once
  const stream = await agentExecutor.stream(inputs);
  for await (const chunk of stream) {
    console.log('chunk: ', chunk);
  }