langchain-ai / langchain-aws

Build LangChain Applications on AWS
MIT License
98 stars 75 forks source link

ChatBedrockConverse sudden interruption and return: TypeError: Additional kwargs key latencyMs already exists in left dict and value has unsupported type <class 'int'>. #223

Open DiogoPM9 opened 1 week ago

DiogoPM9 commented 1 week ago

Requirements:

langchain==0.3.1
langchain-aws==0.2.1
langchain-chroma==0.1.4
langchain-community==0.3.1
langchain-core==0.3.7
langchain-text-splitters==0.3.0
langgraph==0.2.28
langgraph-checkpoint==1.0.9
langsmith==0.1.129

I was developing a multi-agent workflow using Langchain and Langgraph.

One of my requirement was that the LLMs had to be provided by AWS Bedrock.

However, I noticed an issue with ChatBedrockConverse, where the invocation stops due to an issue with merging dictionaries.

As I was debugging the code, I replace my LLM with one from OpenAI and that fixed the issue I had., however, since I have the stated requirement , this is not only not a solution, but seems to identify a problem in langchain-aws specifically.

Code used:

import os
import re
import pandas as pd
import json
from langgraph.prebuilt import ToolNode, tools_condition

from dotenv import load_dotenv
from langchain_chroma import Chroma
from langchain_community.embeddings import BedrockEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFDirectoryLoader
from langchain_core.prompts import MessagesPlaceholder

from typing import Annotated, Literal
from typing_extensions import TypedDict

from IPython.display import Image, display

from langchain_aws import ChatBedrockConverse

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.prompts import MessagesPlaceholder
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.messages.tool import ToolMessage

from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

from langchain_core.tools import tool
from langchain_community.document_loaders.athena import AthenaLoader
from langgraph.checkpoint.memory import MemorySaver

from pydantic import BaseModel

load_dotenv()

from langchain_openai import ChatOpenAI

@tool
def sales_records(
        start_date: str,
        end_date: str,
) -> dict:
    """
    Fetches the sales records

    Args:
        start_date: date to begin the search
        end_date: date to end the search
    Returns:
        a dictionary with the sales
    """
    return {"banana": 14, "apples": 20, "oranges": 2}

@tool
def context_retriever():
    """
    Retrieves the required context for the report.
    Returns:
        Context for the report.
    """
    return "The report must end with: And that is the way the news goes."

mistral = "mistral.mistral-large-2402-v1:0"
anthropic = "anthropic.claude-3-sonnet-20240229-v1:0"
# LLM = ChatBedrockConverse(
#         model=anthropic,
#         region_name=ZONE,
#         credentials_profile_name=PROFILE_NAME)

LLM = ChatOpenAI(
    model="gpt-4o",
    temperature=0,
    max_tokens=None,
    timeout=None,
    max_retries=2,
    api_key=API_KEY, 
)
class State(TypedDict):
    messages: Annotated[list, add_messages]
    feedback: str
    client_data: str
    context: str

members =["ToolCaller", "ReportMaker", "FeedbackRequester", "FINISH"]

# Architect
architect_instructions = f"""
You are a supervisor responsible for the delivery of a Sales report. Your workers comprise of {members}

The process to generate a report is as follows:

1. The ToolCaller will query the sales
2. The ToolCaller will retrieve the required context for the report
3. The Report Maker will generate a report
4. The FeedbackRequester asks the user for feedback
5. If there is feedback then call Report Maker again. 
6. The FINISH will end the task in case the user says that there is no feedback

Given the following user request, respond with the worker to act next.
If the worker is ToolCaller you must also add what it needs to do:
Example: ToolCall, to retrieve data from athena. 

For the other workers, only the name is to be generated.
Each worker will perform a task and respond with their results and status.
"""
architect_prompt = ChatPromptTemplate.from_messages([
    ("system", architect_instructions), 
    MessagesPlaceholder(variable_name="messages")])
architect_chain = architect_prompt | LLM

# Report Maker
report_instructions = """
You are responsible for generating a report on the data you receive. 
If there is feedback you must take it into account.
The data: {data}
The context: {context}
Output the following: This is a dummy report.
If there is feedback: {feedback}
Then add the following to the above: There is feedback
"""
report_prompt = ChatPromptTemplate.from_messages(
    [("system", report_instructions), MessagesPlaceholder(variable_name="messages")])
report_chain = report_prompt | LLM

tool_caller_instructions = """
You are an assitant that works together with other agents to produce a report. Your task is to use the tools
you have access to provide the data to the other agents.

You must do everything you are tasked with before ending.
"""
tool_caller_prompt = ChatPromptTemplate.from_messages(
    [("system", tool_caller_instructions), MessagesPlaceholder(variable_name="messages")])
tool_caller_chain = tool_caller_prompt | LLM.bind_tools([sales_records, context_retriever])

# Nodes
def assistant(state: State) -> State:
    print("*************In the assitant node*************")
    result = architect_chain.invoke({"messages": state["messages"]})
    state["messages"].append(result)
    return state

def tool_caller(state: State) -> State:
    print("*************In the Tool Caller node*************")
    last_message = state["messages"][-1]
    output = tool_caller_chain.invoke({"messages": [
        HumanMessage(f"Given the following messages {last_message}, use the appropriate tool you have access to.")]})
    state["messages"].append(output)
    return state

def reporter(state: State) -> State:
    print("*************In the report node*************")
    feedback = feedback = state.get("feedback", "No feedback")
    report = report_chain.invoke({"messages": [HumanMessage("Generate a report given the data, context and feedback provided")],
                                  "context": state["context"],
                                  "data": state["client_data"],
                                  "feedback": feedback})
    state["messages"].append(report)
    return state

def human_feedback_node(state: State) -> State:
    print("*************In the human feedback Node*************")
    report_message = state["messages"][-1]    
    while True:
        choice = input("Would you like to add feedback? (Yes or No)")
        if choice in ["Yes", "No"]:
            break
        if choice == "No":
            state["messages"].append(HumanMessage(content="No feedback to add. Finish."))
        print("Invalid choice, please try again (Yes or No)")

    if choice == "Yes":
        state["feedback"] = True
        while True:
                feedback = input("Please specify what feedback you would like to add: ").strip()
                if feedback:
                    state["messages"].append(HumanMessage(content=feedback))
                    state["feedback"] = feedback
                    break
                print("Request cannot be empty. Please try again.")
    else:
        state["feedback"] = False
    return state

def general_router(state: State):
    print("*************In the General router Node*************")
    last_message = state["messages"][-1]
    if "ToolCaller" in last_message.content:
        return "ToolCaller"
    elif last_message.content == "ReportMaker":
        return "ReportMaker"
    elif last_message.content == "FeedbackRequester":
        return "FeedbackRequester"
    elif last_message.content in ["There is no feedback.", "FINISH"]:
        return "__end__"

def tool_router(state: State) -> State:
    print("*************In the Tool router Node*************")
    last_message = state["messages"][-1]
    if last_message.tool_calls:
        return "Tools"
    else:
        return "Architect"

def run_tool(state: State, tools: dict) -> State:
    print("*************running the tool*************")
    tool_calls = state["messages"][-1].tool_calls
    for tool_call in tool_calls:
        tool = tools[tool_call["name"]]
        result = tool.invoke(tool_call["args"])
        state["messages"].append(ToolMessage(content=str(result), 
                                             tool_call_id= tool_call["id"]))
        if tool.name == "sales_records":
            state["client_data"] = str(result)
        if tool.name == "context_retriever":
            state["context"] = result
    return state

workflow = StateGraph(State)
workflow.add_node("Architect", assistant)
workflow.add_node("ToolCaller", tool_caller)
workflow.add_node("ReportMaker", reporter)
workflow.add_node("FeedbackRequester", human_feedback_node)
workflow.add_node("Tools", lambda state: run_tool(state, tools={"sales_records":sales_records, "context_retriever": context_retriever}))

workflow.add_edge(START, "Architect")
workflow.add_conditional_edges(
    "Architect",
    general_router,
    {
        "__end__": END,
        "ToolCaller": "ToolCaller",
        "ReportMaker": "ReportMaker",
        "FeedbackRequester": "FeedbackRequester"
    }
)
workflow.add_conditional_edges(
    "ToolCaller",
    tool_router,
    {
        "Architect": "Architect",
        "Tools": "Tools",
    }
)
workflow.add_edge("Tools", "Architect")
workflow.add_edge("ReportMaker", "Architect")
workflow.add_edge("FeedbackRequester", "Architect")

graph = workflow.compile()

events = graph.invoke({"messages": HumanMessage(
    content="Give me a report for the following, start date:1998-01-10 00:00:00.000 and end_date:2005-09-13 00:00:00.000")})

The graph is as below: image

As you can see, if you choose to use ChatOpenAI, the code will execute smoothly, however, when you change to ChatBerockConverse, the invocation breaks.

When the graph is invoked, the architect successfully call the Tools worker. However, when the architect receives the tool call output, it breaks. The output was the following:

*************In the assitant node*************
*************In the General router Node*************
*************In the Tool Caller node*************
*************In the Tool router Node*************
*************running the tool*************
*************In the assitant node*************
TypeError: Additional kwargs key latencyMs already exists in left dict and value has unsupported type <class 'int'>.
tomaszbk commented 1 week ago

I have this same Issue

3coins commented 5 days ago

@DiogoPM9 Thanks for reporting this bug. I can reproduce the issue in my testing. The issue seems to be because of latencyMs attribute present in the Bedrock responses. While it's ok to for this to be present in the response, it's type int is something not supported in response_metadata when messages are merged, only dict, list or string is allowed. Updating the latencyMs value to a list should fix this issue.

File "/Users/pijain/projects/langchain-aws-dev/langchain-aws/libs/aws/langchain_aws/chat_models/bedrock_converse.py", line 682, in _messages_to_bedrock
    messages = merge_message_runs(messages)
....
File "/Users/pijain/Library/Caches/pypoetry/virtualenvs/langchain-aws-eH7P7gjZ-py3.10/lib/python3.10/site-packages/langchain_core/utils/_merge.py", line 59, in merge_dicts
    merged[right_k] = merge_dicts(merged[right_k], right_v)
  File "/Users/pijain/Library/Caches/pypoetry/virtualenvs/langchain-aws-eH7P7gjZ-py3.10/lib/python3.10/site-packages/langchain_core/utils/_merge.py", line 65, in merge_dicts
    raise TypeError(
TypeError: Additional kwargs key latencyMs already exists in left dict and value has unsupported type <class 'int'>.
3coins commented 3 days ago

Seems like we will need a change in this function, to update the latencyMs from int to list.

https://github.com/langchain-ai/langchain-aws/blob/adcfc348c235635e910b9b9bb5c661d3829b3955/libs/aws/langchain_aws/chat_models/bedrock_converse.py#L720-L731

Here is a simplified solution, however we might need to handle other keys that might cause issue while merging messages as well.

response["metrics"]["latencyMs"] = [response["metrics"]["latencyMs"]]