langchain-ai / langchain

🦜🔗 Build context-aware reasoning applications
https://python.langchain.com
MIT License
91.98k stars 14.63k forks source link

FunctionMessage doesn't work with astream_events api #24007

Open mantrakp04 opened 1 month ago

mantrakp04 commented 1 month ago

Checked other resources

Example Code

import os
from typing import *

from langchain_anthropic import ChatAnthropic
from langchain_mongodb.chat_message_histories import MongoDBChatMessageHistory
from langchain.agents import create_tool_calling_agent
from langchain_core.prompts import MessagesPlaceholder
from langchain.memory import ConversationBufferWindowMemory
from langchain.agents import AgentExecutor
from langchain.tools import tool
from langchain_core.prompts import ChatPromptTemplate
from langchain.tools import Tool
from langchain_community.utilities import GoogleSerperAPIWrapper
from uuid import uuid4
chat_id = "b894e0c7-acb1-4907-9bsbc-bb98f5a970dc"

def google_search_tool(iso: str="us"):
    google_search = GoogleSerperAPIWrapper(gl=iso)
    google_image_search = GoogleSerperAPIWrapper(gl=iso, type="images")
    google_news_search = GoogleSerperAPIWrapper(gl=iso, type="news")
    google_places_search = GoogleSerperAPIWrapper(gl=iso, type="places")

    return [
        Tool(
            name="google_search",
            func=google_search.run,
            description="Search Google for information."
        ),
        Tool(
            name="google_image_search",
            func=google_image_search.run,
            description="Search Google for images."
        ),
        Tool(
            name="google_news_search",
            func=google_news_search.run,
            description="Search Google for news."
        ),
        Tool(
            name="google_places_search",
            func=google_places_search.run,
            description="Search Google for places."
        )
    ]

workspace_id = "test"
request_id = str(uuid4())
system_template = "You are a helpful AI agent. Always use the tools at your dispoal"
prompt = ""
tools = google_search_tool("in")
llm_kwargs = {}
llm = ChatAnthropic(
    model="claude-3-5-sonnet-20240620",
    streaming=True,
    api_key="yurrrrrrrrrrrrr",
)

base_template = ChatPromptTemplate.from_messages([
    ("system", system_template),
    MessagesPlaceholder(variable_name="chat_history") if chat_id else None,
    ("human", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad")
])

agent = create_tool_calling_agent(llm=llm, tools=tools, prompt=base_template)

chat_message_history = MongoDBChatMessageHistory(
    session_id=chat_id,
    connection_string=os.getenv('MONGO_URI'),
    database_name=os.getenv('MONGO_DBNAME'), # "api"
    collection_name="chat_histories",
)

conversational_memory = ConversationBufferWindowMemory(
    chat_memory=chat_message_history,
    memory_key='chat_history',
    return_messages=True,
    output_key="output",
    input_key="input",
)

agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    memory=conversational_memory,
    return_intermediate_steps=True,
    handle_parsing_errors=True
).with_config({"run_name": "Agent"})

response = []
run = agent_executor.astream_events(input = {"input": "what is glg stock"}, version="v2")
async for event in run:
    response.append(event)
    kind = event["event"]
    if kind == "on_chain_start":
        if (
            event["name"] == "Agent"
        ):  # Was assigned when creating the agent with `.with_config({"run_name": "Agent"})`
            print(
                f"Starting agent: {event['name']} with input: {event['data'].get('input')}"
            )
    elif kind == "on_chain_end":
        if (
            event["name"] == "Agent"
        ):  # Was assigned when creating the agent with `.with_config({"run_name": "Agent"})`
            print()
            print("--")
            print(
                f"Done agent: {event['name']} with output: {event['data'].get('output')['output']}"
            )
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")
    elif kind == "on_tool_start":
        print("--")
        print(
            f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}"
        )
    elif kind == "on_tool_end":
        print(f"Done tool: {event['name']}")
        print(f"Tool output was: {event['data'].get('output')}")
        print("--")

from langchain_core.messages import FunctionMessage
import json

messages = chat_message_history.messages

for resp in response:
    if resp['event'] == "on_tool_end":
        tool_msg = FunctionMessage(content=json.dumps(resp['data']), id=resp['run_id'], name=resp['name'])   
        messages.insert(-1, tool_msg)

chat_message_history.clear()
chat_message_history.add_messages(messages)
chat_message_history.messages

Error Message and Stack Trace (if applicable)

{
    "name": "KeyError",
    "message": "'function'",
    "stack": "---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
Cell In[14], line 3
      1 response = []
      2 run = agent_executor.astream_events(input = {\"input\": \"what is glg stock\"}, version=\"v2\")
----> 3 async for event in run:
      4     response.append(event)
      5     kind = event[\"event\"]

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:4788, in RunnableBindingBase.astream_events(self, input, config, **kwargs)
   4782 async def astream_events(
   4783     self,
   4784     input: Input,
   4785     config: Optional[RunnableConfig] = None,
   4786     **kwargs: Optional[Any],
   4787 ) -> AsyncIterator[StreamEvent]:
-> 4788     async for item in self.bound.astream_events(
   4789         input, self._merge_configs(config), **{**self.kwargs, **kwargs}
   4790     ):
   4791         yield item

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:1146, in Runnable.astream_events(self, input, config, version, include_names, include_types, include_tags, exclude_names, exclude_types, exclude_tags, **kwargs)
   1141     raise NotImplementedError(
   1142         'Only versions \"v1\" and \"v2\" of the schema is currently supported.'
   1143     )
   1145 async with aclosing(event_stream):
-> 1146     async for event in event_stream:
   1147         yield event

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/tracers/event_stream.py:947, in _astream_events_implementation_v2(runnable, input, config, include_names, include_types, include_tags, exclude_names, exclude_types, exclude_tags, **kwargs)
    945 # Await it anyway, to run any cleanup code, and propagate any exceptions
    946 try:
--> 947     await task
    948 except asyncio.CancelledError:
    949     pass

File /usr/lib/python3.10/asyncio/futures.py:288, in Future.__await__(self)
    286 if not self.done():
    287     raise RuntimeError(\"await wasn't used with future\")
--> 288 return self.result()

File /usr/lib/python3.10/asyncio/futures.py:201, in Future.result(self)
    199 self.__log_traceback = False
    200 if self._exception is not None:
--> 201     raise self._exception.with_traceback(self._exception_tb)
    202 return self._result

File /usr/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
    228 try:
    229     if exc is None:
    230         # We use the `send` method directly, because coroutines
    231         # don't have `__iter__` and `__next__` methods.
--> 232         result = coro.send(None)
    233     else:
    234         result = coro.throw(exc)

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/tracers/event_stream.py:907, in _astream_events_implementation_v2.<locals>.consume_astream()
    904 try:
    905     # if astream also calls tap_output_aiter this will be a no-op
    906     async with aclosing(runnable.astream(input, config, **kwargs)) as stream:
--> 907         async for _ in event_streamer.tap_output_aiter(run_id, stream):
    908             # All the content will be picked up
    909             pass
    910 finally:

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/tracers/event_stream.py:153, in _AstreamEventsCallbackHandler.tap_output_aiter(self, run_id, output)
    151 tap = self.is_tapped.setdefault(run_id, sentinel)
    152 # wait for first chunk
--> 153 first = await py_anext(output, default=sentinel)
    154 if first is sentinel:
    155     return

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/utils/aiter.py:65, in py_anext.<locals>.anext_impl()
     58 async def anext_impl() -> Union[T, Any]:
     59     try:
     60         # The C code is way more low-level than this, as it implements
     61         # all methods of the iterator protocol. In this implementation
     62         # we're relying on higher-level coroutine concepts, but that's
     63         # exactly what we want -- crosstest pure-Python high-level
     64         # implementation and low-level C anext() iterators.
---> 65         return await __anext__(iterator)
     66     except StopAsyncIteration:
     67         return default

File ~/v3/.dev/lib/python3.10/site-packages/langchain/agents/agent.py:1595, in AgentExecutor.astream(self, input, config, **kwargs)
   1583 config = ensure_config(config)
   1584 iterator = AgentExecutorIterator(
   1585     self,
   1586     input,
   (...)
   1593     **kwargs,
   1594 )
-> 1595 async for step in iterator:
   1596     yield step

File ~/v3/.dev/lib/python3.10/site-packages/langchain/agents/agent_iterator.py:246, in AgentExecutorIterator.__aiter__(self)
    240 while self.agent_executor._should_continue(
    241     self.iterations, self.time_elapsed
    242 ):
    243     # take the next step: this plans next action, executes it,
    244     # yielding action and observation as they are generated
    245     next_step_seq: NextStepOutput = []
--> 246     async for chunk in self.agent_executor._aiter_next_step(
    247         self.name_to_tool_map,
    248         self.color_mapping,
    249         self.inputs,
    250         self.intermediate_steps,
    251         run_manager,
    252     ):
    253         next_step_seq.append(chunk)
    254         # if we're yielding actions, yield them as they come
    255         # do not yield AgentFinish, which will be handled below

File ~/v3/.dev/lib/python3.10/site-packages/langchain/agents/agent.py:1304, in AgentExecutor._aiter_next_step(self, name_to_tool_map, color_mapping, inputs, intermediate_steps, run_manager)
   1301     intermediate_steps = self._prepare_intermediate_steps(intermediate_steps)
   1303     # Call the LLM to see what to do.
-> 1304     output = await self.agent.aplan(
   1305         intermediate_steps,
   1306         callbacks=run_manager.get_child() if run_manager else None,
   1307         **inputs,
   1308     )
   1309 except OutputParserException as e:
   1310     if isinstance(self.handle_parsing_errors, bool):

File ~/v3/.dev/lib/python3.10/site-packages/langchain/agents/agent.py:554, in RunnableMultiActionAgent.aplan(self, intermediate_steps, callbacks, **kwargs)
    546 final_output: Any = None
    547 if self.stream_runnable:
    548     # Use streaming to make sure that the underlying LLM is invoked in a
    549     # streaming
   (...)
    552     # Because the response from the plan is not a generator, we need to
    553     # accumulate the output into final output and return that.
--> 554     async for chunk in self.runnable.astream(
    555         inputs, config={\"callbacks\": callbacks}
    556     ):
    557         if final_output is None:
    558             final_output = chunk

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:2910, in RunnableSequence.astream(self, input, config, **kwargs)
   2907 async def input_aiter() -> AsyncIterator[Input]:
   2908     yield input
-> 2910 async for chunk in self.atransform(input_aiter(), config, **kwargs):
   2911     yield chunk

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:2893, in RunnableSequence.atransform(self, input, config, **kwargs)
   2887 async def atransform(
   2888     self,
   2889     input: AsyncIterator[Input],
   2890     config: Optional[RunnableConfig] = None,
   2891     **kwargs: Optional[Any],
   2892 ) -> AsyncIterator[Output]:
-> 2893     async for chunk in self._atransform_stream_with_config(
   2894         input,
   2895         self._atransform,
   2896         patch_config(config, run_name=(config or {}).get(\"run_name\") or self.name),
   2897         **kwargs,
   2898     ):
   2899         yield chunk

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:1981, in Runnable._atransform_stream_with_config(self, input, transformer, config, run_type, **kwargs)
   1976     chunk: Output = await asyncio.create_task(  # type: ignore[call-arg]
   1977         py_anext(iterator),  # type: ignore[arg-type]
   1978         context=context,
   1979     )
   1980 else:
-> 1981     chunk = cast(Output, await py_anext(iterator))
   1982 yield chunk
   1983 if final_output_supported:

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/tracers/event_stream.py:153, in _AstreamEventsCallbackHandler.tap_output_aiter(self, run_id, output)
    151 tap = self.is_tapped.setdefault(run_id, sentinel)
    152 # wait for first chunk
--> 153 first = await py_anext(output, default=sentinel)
    154 if first is sentinel:
    155     return

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/utils/aiter.py:65, in py_anext.<locals>.anext_impl()
     58 async def anext_impl() -> Union[T, Any]:
     59     try:
     60         # The C code is way more low-level than this, as it implements
     61         # all methods of the iterator protocol. In this implementation
     62         # we're relying on higher-level coroutine concepts, but that's
     63         # exactly what we want -- crosstest pure-Python high-level
     64         # implementation and low-level C anext() iterators.
---> 65         return await __anext__(iterator)
     66     except StopAsyncIteration:
     67         return default

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:2863, in RunnableSequence._atransform(self, input, run_manager, config, **kwargs)
   2861     else:
   2862         final_pipeline = step.atransform(final_pipeline, config)
-> 2863 async for output in final_pipeline:
   2864     yield output

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:1197, in Runnable.atransform(self, input, config, **kwargs)
   1194 final: Input
   1195 got_first_val = False
-> 1197 async for ichunk in input:
   1198     # The default implementation of transform is to buffer input and
   1199     # then call stream.
   1200     # It'll attempt to gather all input into a single chunk using
   1201     # the `+` operator.
   1202     # If the input is not addable, then we'll assume that we can
   1203     # only operate on the last chunk,
   1204     # and we'll iterate until we get to the last chunk.
   1205     if not got_first_val:
   1206         final = ichunk

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:4811, in RunnableBindingBase.atransform(self, input, config, **kwargs)
   4805 async def atransform(
   4806     self,
   4807     input: AsyncIterator[Input],
   4808     config: Optional[RunnableConfig] = None,
   4809     **kwargs: Any,
   4810 ) -> AsyncIterator[Output]:
-> 4811     async for item in self.bound.atransform(
   4812         input,
   4813         self._merge_configs(config),
   4814         **{**self.kwargs, **kwargs},
   4815     ):
   4816         yield item

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:1215, in Runnable.atransform(self, input, config, **kwargs)
   1212             final = ichunk
   1214 if got_first_val:
-> 1215     async for output in self.astream(final, config, **kwargs):
   1216         yield output

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/language_models/chat_models.py:417, in BaseChatModel.astream(self, input, config, stop, **kwargs)
    412 except BaseException as e:
    413     await run_manager.on_llm_error(
    414         e,
    415         response=LLMResult(generations=[[generation]] if generation else []),
    416     )
--> 417     raise e
    418 else:
    419     await run_manager.on_llm_end(
    420         LLMResult(generations=[[generation]]),
    421     )

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/language_models/chat_models.py:395, in BaseChatModel.astream(self, input, config, stop, **kwargs)
    393 generation: Optional[ChatGenerationChunk] = None
    394 try:
--> 395     async for chunk in self._astream(
    396         messages,
    397         stop=stop,
    398         **kwargs,
    399     ):
    400         if chunk.message.id is None:
    401             chunk.message.id = f\"run-{run_manager.run_id}\"

File ~/v3/.dev/lib/python3.10/site-packages/langchain_anthropic/chat_models.py:701, in ChatAnthropic._astream(self, messages, stop, run_manager, stream_usage, **kwargs)
    699     stream_usage = self.stream_usage
    700 kwargs[\"stream\"] = True
--> 701 payload = self._get_request_payload(messages, stop=stop, **kwargs)
    702 stream = await self._async_client.messages.create(**payload)
    703 coerce_content_to_string = not _tools_in_params(payload)

File ~/v3/.dev/lib/python3.10/site-packages/langchain_anthropic/chat_models.py:647, in ChatAnthropic._get_request_payload(self, input_, stop, **kwargs)
    639 def _get_request_payload(
    640     self,
    641     input_: LanguageModelInput,
   (...)
    644     **kwargs: Dict,
    645 ) -> Dict:
    646     messages = self._convert_input(input_).to_messages()
--> 647     system, formatted_messages = _format_messages(messages)
    648     payload = {
    649         \"model\": self.model,
    650         \"max_tokens\": self.max_tokens,
   (...)
    658         **kwargs,
    659     }
    660     return {k: v for k, v in payload.items() if v is not None}

File ~/v3/.dev/lib/python3.10/site-packages/langchain_anthropic/chat_models.py:170, in _format_messages(messages)
    167     system = message.content
    168     continue
--> 170 role = _message_type_lookups[message.type]
    171 content: Union[str, List]
    173 if not isinstance(message.content, str):
    174     # parse as dict

KeyError: 'function'"
}

Description

the above error occurs when I add FunctionMessage to chathistory, and run the agent again.

ex:

1st run) input: what is apple stock rn - runs perfectly 2nd run) input: what is google stock rn - gives the above error

System Info

System Information
------------------
> OS:  Linux
> OS Version:  #1 SMP Fri Mar 29 23:14:13 UTC 2024
> Python Version:  3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0]

Package Information
-------------------
> langchain_core: 0.2.10
> langchain: 0.2.6
> langchain_community: 0.2.6
> langsmith: 0.1.84
> langchain_anthropic: 0.1.19
> langchain_groq: 0.1.6
> langchain_mongodb: 0.1.6
> langchain_openai: 0.1.13
> langchain_text_splitters: 0.2.2
> langchainhub: 0.1.20
> langserve: 0.2.2

Packages not installed (Not Necessarily a Problem)
--------------------------------------------------
The following packages were not found:

> langgraph
eyurtsev commented 1 month ago

From the stack trace it seems like it's not from the astream events API, but something in the chat anthropic implementation.

Would you mind checking what happens if you generate a list of BaseMessage that corresponds to the chat history you have and then invoke them with async for chunk in model.astream(messages) and list(model.stream(messages)).

I suspect it'll trigger the error as well and help produce a minimal reproducible example

mantrakp04 commented 1 month ago

chat anthropoic is weird I suspected that as well it returns a list of json as response and doesn’t run if I convert the json to a str and replace the message.content with that str

On Wed, Jul 10, 2024 at 3:00 AM Eugene Yurtsev @.***> wrote:

From the stack trace it seems like it's not from the astream events API, but something in the chat anthropic implementation.

Would you mind checking what happens if you generate a list of BaseMessage that corresponds to the chat history you have and then invoke them with async for chunk in model.astream(messages) and list(model.stream(messages)).

I suspect it'll trigger the error as well and help produce a minimal reproducible example

— Reply to this email directly, view it on GitHub https://github.com/langchain-ai/langchain/issues/24007#issuecomment-2218764472, or unsubscribe https://github.com/notifications/unsubscribe-auth/AUY3AOKCW4GTA54VGNIJLJDZLRJARAVCNFSM6AAAAABKSGSBKSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDEMJYG43DINBXGI . You are receiving this because you authored the thread.Message ID: @.***>