langchain-ai / langchain

🦜🔗 Build context-aware reasoning applications
https://python.langchain.com
MIT License
94.9k stars 15.37k forks source link

Bug Report: astream_events fails when tool returns a dictionary #26763

Open sharrajesh opened 1 month ago

sharrajesh commented 1 month ago

Checked other resources

Description

When using astream_events with an agent that has multiple tools, if one of the tools returns a dictionary containing an answer and source documents, the streaming process fails. However, the same setup works correctly in non-streaming mode.

Steps to Reproduce

  1. Set up an agent with multiple tools, including one that returns a dictionary.
  2. Use astream_events to stream the agent's output.
  3. Observe the error when the tool returning a dictionary is called.

Code to Reproduce

import asyncio
import json
import os
import random
import warnings
from typing import Type, Dict, Any

import boto3
from dotenv import find_dotenv, load_dotenv
from langchain.agents import create_react_agent, AgentExecutor
from langchain.prompts import PromptTemplate
from langchain.tools import BaseTool
from langchain_aws import ChatBedrock
from langsmith import Client
from pydantic import BaseModel, Field

# Suppress warnings
warnings.filterwarnings("ignore")

load_dotenv(find_dotenv())

reproduce_problem = True

boto3_session = boto3.Session(
    aws_access_key_id=os.environ["BEDROCK_AWS_ACCESS_KEY"],
    aws_secret_access_key=os.environ["BEDROCK_AWS_ACCESS_SECRET"],
    region_name=os.environ["BEDROCK_AWS_REGION"],
)
model = ChatBedrock(
    model_id="anthropic.claude-3-5-sonnet-20240620-v1:0",
    client=boto3_session.client("bedrock-runtime"),
    streaming=True,
    model_kwargs={
        "temperature": 0,
    },
    tags=["agent"],
    metadata={"streaming": True},
    callbacks=None,
)

class CatHidingInput(BaseModel):
    query: str = Field(default="", description="Query about where the cat is hiding")

class CatHidingTool(BaseTool):
    name = "where_cat_is_hiding"
    description = "Use this tool to find out where the cat is hiding right now."
    args_schema: Type[BaseModel] = CatHidingInput

    async def _arun(self, query: str) -> str:
        return random.choice(["under the bed", "on the shelf"])

    def _run(self, query: str) -> str:
        return random.choice(["under the bed", "on the shelf"])

class GetItemsInput(BaseModel):
    place: str = Field(..., description="The place to look for items")

class GetItemsTool(BaseTool):
    name = "get_items"
    description = "Use this tool to look up which items are in the given place."
    args_schema: Type[BaseModel] = GetItemsInput

    async def _arun(self, place: str) -> Dict[str, Any]:
        return self._get_items(place)

    def _run(self, place: str) -> Dict[str, Any]:
        return self._get_items(place)

    def _get_items(self, place: str):
        items = ""
        if "bed" in place:
            items = "socks, shoes and dust bunnies"
        elif "shelf" in place:
            items = "books, pencils and pictures"
        else:
            items = "cat snacks"
        answer = f"The items in the {place} are: {items}"
        source_documents = [
            {
                "page_content": f"Items found in {place}: {items}",
                "metadata": {"source": "GetItemsTool", "place": place},
            }
        ]
        # xxxx: this is the issue
        # it doesn't work in streaming if i return a dictionary which works in non-streaming mode in my application
        if reproduce_problem:
            return {"answer": answer, "source_documents": source_documents}
        else:
            # it works if i return one string
            return json.dumps({"answer": answer, "source_documents": source_documents})

client = Client()

REACT_PROMPT = PromptTemplate.from_template(
    """You are an AI assistant helping to find a cat and items in its location.
Human: {input}
AI: To solve this task, I have access to the following tools:

{tools}

The available tool names are: {tool_names}

Let's approach this step-by-step:

Always use the following format:

Thought: Consider what to do next
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

{agent_scratchpad}"""
)

async def run_agent_streaming():
    tools = [GetItemsTool(), CatHidingTool()]
    agent = create_react_agent(model, tools, REACT_PROMPT)
    agent_executor = AgentExecutor(
        agent=agent,
        tools=tools,
        return_intermediate_steps=True,
        return_source_documents=True,
        handle_parsing_errors=True,
    ).with_config({"run_name": "Agent"})
    async for event in agent_executor.astream_events(
        {"input": "where is the cat hiding? what items are in that location?"},
        version="v2",
    ):
        kind = event["event"]
        if kind == "on_chain_start":
            if event["name"] == "Agent":
                print(
                    f"Starting agent: {event['name']} with input: {event['data'].get('input')}"
                )
        elif kind == "on_chain_end":
            if event["name"] == "Agent":
                print()
                print("--")
                print(
                    f"Done agent: {event['name']} with output: {event['data'].get('output')['output']}"
                )
        elif kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                print(content, end="|")
        elif kind == "on_tool_start":
            print("--")
            print(
                f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}"
            )
        elif kind == "on_tool_end":
            print(f"Done tool: {event['name']}")
            print(f"Tool output was: {event['data'].get('output')}")
            print("--")

async def run_agent_non_streaming():
    tools = [GetItemsTool(), CatHidingTool()]
    agent = create_react_agent(model, tools, REACT_PROMPT)
    agent_executor = AgentExecutor(
        agent=agent,
        tools=tools,
        return_intermediate_steps=True,
        return_source_documents=True,
        handle_parsing_errors=True,
    ).with_config({"run_name": "Agent"})
    result = await agent_executor.ainvoke(
        {"input": "where is the cat hiding? what items are in that location?"}
    )
    print(f"Agent output: {result['output']}")
    print("Intermediate steps:")
    for step in result.get("intermediate_steps", []):
        print(f"- Action: {step[0].tool}")
        print(f"  Input: {step[0].tool_input}")
        print(f"  Output: {step[1]}")

if __name__ == "__main__":
    print("Running streaming version:")
    asyncio.run(run_agent_streaming())
    print("\nRunning non-streaming version:")
    asyncio.run(run_agent_non_streaming())

Error Message

pydantic.v1.error_wrappers.ValidationError: 2 validation errors for HumanMessage
content
  str type expected (type=type_error.str)
content
  value is not a valid list (type=type_error.list)

Expected Behavior

The astream_events method should handle tool outputs that return dictionaries, just as it does in non-streaming mode.

Actual Behavior

The streaming process fails with a ValidationError when a tool returns a dictionary.

Workaround

Returning a JSON string instead of a dictionary from the tool allows the streaming to work:

return json.dumps({"answer": answer, "source_documents": source_documents})

Environment

Model:

Operating System: (Please specify your OS)

Additional Context

This issue only occurs in streaming mode when using astream_events. The same code works correctly in non-streaming mode. It appears that the astream_events method is not properly handling dictionary outputs from tools, possibly due to an issue in the event conversion process.

The problem is reproducible with AWS Bedrock using the Claude 3 Sonnet model, but it may also affect other LLM providers and models.

sharrajesh commented 1 month ago

To add more context

This gist shows working streaming and non streaming code for my langchain agent with multiple tools returning dict when using create_openai_tools_agent https://gist.github.com/sharrajesh/1080af5a95ae9d7b83a8da46597b68e1

This gist show non working streaming code for my langchain agent with multiple tools returning dict when using create_react_agent https://gist.github.com/sharrajesh/765c0b6edfe991363675f45d467e3c93