langchain-ai / langchainjs

🦜🔗 Build context-aware reasoning applications 🦜🔗
https://js.langchain.com/docs/
MIT License
12.43k stars 2.1k forks source link

Cohere Stream Error: additional_kwargs[is_finished] Already Exists in Message Chunk #3937

Closed TomsF1 closed 6 months ago

TomsF1 commented 9 months ago

Description

I'm encountering an error involving the handling of additional keyword arguments in AI message chunks. The specific error message is:

[llm/error] [1:llm:ChatCohere] [1.23s] LLM run errored with error: "additional_kwargs[is_finished] already exists in this message chunk."
VM46951265:582
5
Error in handler Handler, handleChainError: Error: additional_kwargs[is_finished] already exists in this message chunk.
VM46951265:581
 ⨯ Error: additional_kwargs[is_finished] already exists in this message chunk.
    at AIMessageChunk._mergeAdditionalKwargs (webpack-internal:///(rsc)/./node_modules/@langchain/core/dist/messages/index.js:210:23)
    at AIMessageChunk.concat (webpack-internal:///(rsc)/./node_modules/@langchain/core/dist/messages/index.js:266:47)
    at ChatGenerationChunk.concat (webpack-internal:///(rsc)/./node_modules/@langchain/core/dist/outputs.js:56:35)
    at ChatCohere._streamIterator (webpack-internal:///(rsc)/./node_modules/@langchain/core/dist/language_models/chat_models.js:99:59)
    at _streamIterator.next (<anonymous>)
    at Object.pull (webpack-internal:///(rsc)/./node_modules/@langchain/core/dist/utils/stream.js:94:57)
    at ensureIsPromise (c:\Users\redacted\redacted\redacted\redacted\dev.folder.here\lib\internal\webstreams\util.js:192:19)
    at readableStreamDefaultControllerCallPullIfNeeded (c:\Users\redacted\redacted\redacted\redacted\dev.folder.here\lib\internal\webstreams\readablestream.js:2259:5)
    at c:\Users\redacted\redacted\redacted\redacted\dev.folder.here\lib\internal\webstreams\readablestream.js:2264:9
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {stack: 'Error: additional_kwargs[is_finished] already…ions (node:internal/process/task_queues:95:5)', message: 'additional_kwargs[is_finished] already exists in this message chunk.'}

Steps to Reproduce

Invoke below snippet as part of runnable sequence:


async (previousStepResult: {documents: any, userQuestion: string}) => {
      const userQuestion = previousStepResult.userQuestion;
      const documents = previousStepResult.documents;       
      const response = await cohereChatModel.stream(
        [new HumanMessage(userQuestion)],
        { 
          preambleOverride: "text here",
          conversationId: conversationId,
          promptTruncation: "AUTO",
          // connectors: [{id: "web-search"}], // turned-off
          searchQueriesOnly: false,
          documents: documents,
          citationQuality: "fast",
        },
      );
      let streamTokens = "";
      let streamIters = 0;
      for await (const item of response) {
        streamTokens += item;
        streamIters += 1;
      }
      return streamTokens ;
    },
    new StringOutputParser()
 ]);

Chat model inputs as follows. Setting streaming to false or leaving to default does not seem to resolve or change the error.

const cohereChatModel = new ChatCohere({
      model: "command-nightly",
      temperature: 0.3,
      streaming: true,
      verbose: true,
 })

Environment

Langchain version: 0.1.0
Langchain-cohere version: 0.0.2
dosubot[bot] commented 9 months ago

🤖

Hi @TomsF1! I'm Dosu, a bot here to assist you with your issues and queries while we wait for a human maintainer. Even though I'm a bot, I can help debug problems, answer questions, and guide you to contribute to our project. Let's decode this bug together! 🐞

The error you're encountering, "additional_kwargs[is_finished] already exists in this message chunk", is likely due to a conflict in the additional_kwargs of two message chunks that are being merged. This is handled by the _mergeAdditionalKwargs function in the BaseMessageChunk class in LangChainJS.

This function merges the additional_kwargs of two message chunks. If a key already exists in the additional_kwargs of the first chunk and the types of the values for this key in the two chunks are not the same, or if the key exists but the value is not a string, object, or an array of OpenAIToolCall, an error is thrown.

Here is the relevant code snippet from the LangChainJS codebase:

static _mergeAdditionalKwargs(
    left: NonNullable<BaseMessageFields["additional_kwargs"]>,
    right: NonNullable<BaseMessageFields["additional_kwargs"]>
  ): NonNullable<BaseMessageFields["additional_kwargs"]> {
    const merged = { ...left };
    for (const [key, value] of Object.entries(right)) {
      if (merged[key] === undefined) {
        merged[key] = value;
      } else if (typeof merged[key] !== typeof value) {
        throw new Error(
          `additional_kwargs[${key}] already exists in the message chunk, but with a different type.`
        );
      } else if (typeof merged[key] === "string") {
        merged[key] = (merged[key] as string) + value;
      } else if (
        !Array.isArray(merged[key]) &&
        typeof merged[key] === "object"
      ) {
        merged[key] = this._mergeAdditionalKwargs(
          merged[key] as NonNullable<BaseMessageFields["additional_kwargs"]>,
          value as NonNullable<BaseMessageFields["additional_kwargs"]>
        );
      } else if (
        key === "tool_calls" &&
        isOpenAIToolCallArray(merged[key]) &&
        isOpenAIToolCallArray(value)
      ) {
        // ... code for merging tool_calls ...
      } else {
        throw new Error(
          `additional_kwargs[${key}] already exists in this message chunk.`
        );
      }
    }
    return merged;
  }

To resolve this issue, you need to ensure that the additional_kwargs of the message chunks you are trying to concatenate do not contain the same keys with different types or keys with values that are not strings, objects, or arrays of OpenAIToolCall.

As for the purpose of additional_kwargs[is_finished] in the LangChainJS codebase, I wasn't able to find specific information within the repository. It's possible that this key is used elsewhere in the codebase or in a dependency, but without more information, it's difficult to say for certain.

If you're still encountering this issue after checking the additional_kwargs in your message chunks, could you provide more information about the context in which this error is occurring? Specifically, it would be helpful to know more about the message chunks that are being merged when this error occurs.

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

jacoblee93 commented 9 months ago

Thanks for the report - @bracesproul can you have a look at this one?

bracesproul commented 9 months ago

Hey @TomsF1 the code you included in your issue description has some syntax errors, but when I simplified it down to this:

  const userQuestion = "How old is the earth?";

  const cohereChatModel = new ChatCohere({
    model: "command-nightly",
    temperature: 0.3,
    streaming: true,
    verbose: true
  }).pipe(new StringOutputParser());

  const response = await cohereChatModel.stream([
    new HumanMessage(userQuestion)
  ]);

  let streamTokens = "";
  let streamIters = 0;
  for await (const item of response) {
    streamTokens += item;
    streamIters += 1;
  }

I was unable to reproduce.

What environment are you running this on? And if you could push up the code which causes this error to a github repo I could clone into that would be much appreciated.

TomsF1 commented 9 months ago

Thanks @bracesproul, environment is Next.js with edge runtime (Vercel). Your simplified example works but seems to be awaiting the full response in my script (I'll make some checks on my side). However there's an issue with adding the parser into the chain sequentially, as it blocks the ability to pass-in optional invocation params when invoking the .stream method. E.g. the below returns the following syntax error.

Code Sample:

const answerChain = RunnableSequence.from([
      async (previousStepResult: {documents: any, userQuestion: string, chatHistory: chatHistory[]}) => {
        const userQuestion = previousStepResult.userQuestion;
        const documents = previousStepResult.documents;
        const chatHistory = previousStepResult.chatHistory;
        const cohereChatModel = new ChatCohere({
          model: "command-nightly",
          temperature: 0.3,
          // streaming: true,
          verbose: true,
        }).pipe(new StringOutputParser());

      const response = await cohereChatModel.stream(
        [new HumanMessage(userQuestion)],
        {
          preambleOverride: "text here",
          conversationId: conversationId,
          promptTruncation: "AUTO",
          // connectors: [{id: "web-search"}], // turned-off
          searchQueriesOnly: false,
          documents: documents,
          citationQuality: "fast"
        }
      );
        let streamTokens = "";
        let streamIters = 0;
        for await (const item of response) {
          streamTokens += item;
          streamIters += 1;
        }
        console.log("streamTokens:", streamTokens);
        console.log("streamIters:", streamIters);
        return streamTokens;
      },
      new BytesOutputParser()
]);

Error:

Object literal may only specify known properties, and 'preambleOverride' does not exist in type 'Partial<RunnableConfig>'.ts(2353)

I tried bypassing this by overriding the invocationParamsmethod directly, but this makes the runnable incompatible when invoking the stream method returning the below error.

Code Sample:

const answerChain = RunnableSequence.from([
      async (previousStepResult: {documents: any, userQuestion: string, chatHistory: chatHistory[]}) => {
        const userQuestion = previousStepResult.userQuestion;
        const documents = previousStepResult.documents;
        const chatHistory = previousStepResult.chatHistory;
        const cohereChatModel = new ChatCohere({
          model: "command-nightly",
          temperature: 0.3,
          // streaming: true,
          verbose: true,
        }).invocationParams({
          preambleOverride: "text here",
          conversationId: conversationId,
          promptTruncation: "AUTO",
          // connectors: [{id: "web-search"}], // turned-off
          searchQueriesOnly: false,
          documents: documents,
          citationQuality: "fast"
        });

        const parsedOutput = cohereChatModel.pipe(new StringOutputParser());

        const response = await parsedOutput.stream([
          new HumanMessage(userQuestion)
        ]);

        let streamTokens = "";
        let streamIters = 0;
        for await (const item of response) {
          streamTokens += item;
          streamIters += 1;
        }
        console.log("streamTokens:", streamTokens);
        console.log("streamIters:", streamIters);
        return streamTokens;
      },
      new BytesOutputParser()
]);

Error:

Cannot invoke an object which is possibly 'undefined'.ts(2722) 'cohereChatModel.pipe' is possibly 'undefined'.ts(18048)

I'm thinking either the CohereInput interface needs extending to allow for these params to be passed in as part of the constructor when initialising the ChatCohere class, or the parsing needs to be handled within the classes streaming method(s). The latter would then still allow params to be passed in when using .stream method.

Note, non-streamed responses are working fine with the .invoke method in below code snippet. It would be excellent if the for loop and parsing could be abstracted away (so the only change in below code to stream responses would be changing the method from .invoke to .stream).

Code sample:

const answerChain = RunnableSequence.from([
      async (previousStepResult: {documents: any, userQuestion: string, chatHistory: chatHistory[]}) => {
        const userQuestion = previousStepResult.userQuestion;
        const documents = previousStepResult.documents;
        const chatHistory = previousStepResult.chatHistory;
        const cohereChatModel = new ChatCohere({
          model: "command-nightly",
          temperature: 0.3,
          // streaming: true,
          verbose: true,
        });         
        const response = await cohereChatModel.invoke(
          [new HumanMessage(userQuestion)],
          {
            preambleOverride: "text here",
            conversationId: conversationId,
            promptTruncation: "AUTO",
            // connectors: [{id: "web-search"}], // turned-off
            searchQueriesOnly: false,
            documents: documents,
            citationQuality: "fast"
          }
        );
        return response;
      },
      new BytesOutputParser()
 ]);

Hopefully this is enough info, let me know if not and can put something together for you.

Gr33nLight commented 6 months ago

Any updated on this? @TomsF1 did you manage to make this work? I'm having trouble figuring out how to pass arguments like searchQueriesOnly or documents to the cohere client when used with langchain runnables..

jacoblee93 commented 6 months ago

I believe the original issue should be recently fixed in the later versions of @langchain/core in #4743:

https://github.com/langchain-ai/langchainjs/blob/main/langchain-core/src/messages/index.ts#L304

If not I can look into patching.

@Gr33nLight can you try the following:

const cohereChatModel = new ChatCohere({
    model: "command-nightly",
    temperature: 0.3,
    streaming: true,
    verbose: true
}).bind({ searchQueriesOnly: true, documents: ... });

Please comment if that doesn't work.

Gr33nLight commented 6 months ago

@jacoblee93 Yes can confirm its fixed in latest @langchain/core great!