langchain-ai / langserve

LangServe 🦜️🏓
Other
1.95k stars 218 forks source link

AgentExecutor works strange with LangServe #314

Open IvanShah opened 11 months ago

IvanShah commented 11 months ago

I encountered difficulties when using AgentExecutor in LangServe:

My LLM settings: llm = ChatOpenAI(temperature=0.2, model="gpt-4-1106-preview", streaming=True, callbacks=[FinalStreamingStdOutCallbackHandler()]).configurable_fields( temperature=ConfigurableField( id="llm_temperature", name="LLM Temperature", description="The temperature of the LLM"))

How it looks with AgentExecutor:

Снимок экрана 2023-12-12 в 10 59 47

How it looks with chain:

Снимок экрана 2023-12-12 в 11 01 20
eyurtsev commented 11 months ago

Hi @IvanShah, could you include minimal code to reproduce?

Could you confirm that you're not using RunnableLambdas but RunnableGenerators with agents?

For example, see: https://github.com/langchain-ai/langserve/discussions/308#discussioncomment-7805035

IvanShah commented 11 months ago

@eyurtsev Yes of course here, it's an improved example (with configurable field and streaming) from https://github.com/langchain-ai/langchain/blob/c0f4b95aa9961724ab4569049b4c3bc12ebbacfc/templates/openai-functions-agent/openai_functions_agent/agent.py:

import os
from typing import List, Tuple

import uvicorn
from fastapi import FastAPI
from langchain.agents import AgentExecutor
from langchain.agents.format_scratchpad import format_to_openai_function_messages
from langchain.agents.output_parsers import OpenAIFunctionsAgentOutputParser
from langchain.callbacks import FinalStreamingStdOutCallbackHandler
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.pydantic_v1 import BaseModel, Field
from langchain.schema.messages import AIMessage, HumanMessage
from langchain.tools.render import format_tool_to_openai_function
from langchain_community.utilities.google_serper import GoogleSerperAPIWrapper
from langchain_core.runnables import ConfigurableField
from langchain_core.tools import Tool
from langserve import add_routes

os.environ["OPENAI_API_KEY"] = ''
os.environ["SERPER_API_KEY"] = ''

# Create the tool

search = GoogleSerperAPIWrapper()
tools = [
    Tool(
        name="search",
        func=search.run,
        description=""""A search engine optimized for comprehensive, accurate, \
            and trusted results. Useful for when you need to answer questions \
            about current events or about recent information. \
            Input should be a search query. \
            If the user is asking about something that you don't know about, \
            you should probably use this tool to see if that can provide any information.""",
    )]

app = FastAPI(
    title='Example',
)

llm = ChatOpenAI(temperature=0.2,
                 model="gpt-4-1106-preview",
                 streaming=True,
                 callbacks=[FinalStreamingStdOutCallbackHandler()]).configurable_fields(
    temperature=ConfigurableField(
        id="llm_temperature",
        name="LLM Temperature",
        description="The temperature of the LLM"))
assistant_system_message = """You are a helpful assistant. \
Use tools (only if necessary) to best answer the users questions."""
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", assistant_system_message),
        MessagesPlaceholder(variable_name="chat_history"),
        ("user", "{input}"),
        MessagesPlaceholder(variable_name="agent_scratchpad"),
    ]
)

llm_with_tools = llm.bind(functions=[format_tool_to_openai_function(t) for t in tools])

def _format_chat_history(chat_history: List[Tuple[str, str]]):
    buffer = []
    for human, ai in chat_history:
        buffer.append(HumanMessage(content=human))
        buffer.append(AIMessage(content=ai))
    return buffer

agent = (
    {
        "input": lambda x: x["input"],
        "chat_history": lambda x: _format_chat_history(x["chat_history"]),
        "agent_scratchpad": lambda x: format_to_openai_function_messages(
            x["intermediate_steps"]
        ),
    }
    | prompt
    | llm_with_tools
    | OpenAIFunctionsAgentOutputParser()
)

class AgentInput(BaseModel):
    input: str
    chat_history: List[Tuple[str, str]] = Field(
        ..., extra={"widget": {"type": "chat", "input": "input", "output": "output"}}
    )

agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True).with_types(
    input_type=AgentInput
)

add_routes(
    app,
    agent_executor,
    path="/assistant",
)

if __name__ == '__main__':
    uvicorn.run('example:app')
IvanShah commented 11 months ago

In console streaming working well, but in API and playground - doesn't @eyurtsev

IvanShah commented 11 months ago

I have tried to add RunnablePassthrough() - it doesn't help

add_routes(
    app,
    agent_executor | RunnablePassthrough(),
    path="/assistant",
)

After, I have tried this:

def _transform(input_stream):
    for chunk in input_stream:
        yield chunk['output']

add_routes(
    app,
    agent_executor | RunnableGenerator(_transform),
    path="/assistant",
)

With this code I have an error "atransform not implemented" Please still need help with this @eyurtsev

eyurtsev commented 11 months ago

With this code I have an error "atransform not implemented"

Likely a bug in RunnableGenerator. Issue here: https://github.com/langchain-ai/langserve/issues/344. I'll try to re-recreate that issue and make a patch to langchain.

However, that is likely not root problem with streaming that you're encountering.

In console streaming working well, but in API and playground - doesn't @eyurtsev

Could you confirm that you're getting separate chunks separated by a new line with this code:

for chunk in chain.stream(...): # fill in with whatever input is appropriate for the agent
    print()
    print(chunk.content, end='', flush=True)
eyurtsev commented 11 months ago

@IvanShah for the Runnable Generator, you need to provide an async function definition:

    async def gen(input: Iterator[Any]) -> Iterator[int]:
        async for x in input:
            yield x['output']
IvanShah commented 11 months ago

@eyurtsev I have tried Runnable Generator - no changes, maybe I use it wrong? yield x['output'] called only once for full output if I use it like:

 async def gen(input: Iterator[Any]) -> Iterator[int]:
        async for x in input:
            yield x['output']

add_routes(
    app,
    agent_executor | RunnableGenerator(gen),
    path="/assistant",
)
IvanShah commented 11 months ago

@eyurtsev About this piece of code:

for chunk in chain.stream(...): # fill in with whatever input is appropriate for the agent
    print()
    print(chunk.content, end='', flush=True)

In debug it comes here with full output once not token by token, in console it prints still token by token...

IvanShah commented 11 months ago

I have tried to remove agent executor. With simple LCEL chain streaming is working as well as temperature field from llm is displayed in langserve. But I need agent to use a multiple tools, use memory and streaming. Without AgentExecutor it's not gone a work:( @eyurtsev Maybe I understand something wrong, or maybe you have a working example for this? Or I should create an issue in langchain repo?

IvanShah commented 11 months ago

And another update:) If I wrote my own callback handler like this and use in llm

class MyCallbackHandler(BaseCallbackHandler):
  def on_llm_new_token(self, token, **kwargs) -> None:
    print(f"#{token}#")

llm = ChatOpenAI(temperature=0.2,
                 model="gpt-4-1106-preview",
                 streaming=True,
                 callbacks=[MyCallbackHandler()]
                 )

with AgentExecutor in route:

add_routes(
    app,
    agent_executor | RunnableGenerator(gen),
    path="/assistant",
)

Each token comes in new line so the streaming working itself, just stream endpoint - doesn't. I think this relates to https://github.com/langchain-ai/langchain/discussions/12699 @eyurtsev Please have a look

IvanShah commented 10 months ago

Could you please have a look on updates above? @eyurtsev And second question in this topic was about missing temperature field if I use AgenExecutor. Could you please comment about this? @eyurtsev

eyurtsev commented 10 months ago

Thanks for pinging! Taking a look now

eyurtsev commented 10 months ago

@IvanShah Could you confirm that this is what you're seeing in the console:

https://github.com/langchain-ai/langserve/blob/69c6a76b193fb53474b204f8eec048bac21ee52e/examples/agent/client.ipynb

The agent is streaming here actions, but LLM tokens are not streamed one by one. Do you expect to see the llm tokens to appear one at a time or is step by step streaming OK?

IvanShah commented 10 months ago

@eyurtsev Yes I see the output action by action. I expect see it token by token. Do you know how to do this? If I use just LCEL without executor it streams token by token.

eyurtsev commented 10 months ago

The current agent executor was designed to work with action by action streaming. If this is blocking, you can implement a custom runnable with a cusotm .astream() implementation that implements the agent executor.

I'll investigate if we're able to add support to astream log rather than stream to surface individual llm tokens.


Also what output type do you expect to see from .stream() for an agent? Are you OK with the astream log message format?

eyurtsev commented 10 months ago

@IvanShah Looks like I was wrong! You can get llm tokens to stream by setting the llm itself to stream and using astream log:

https://python.langchain.com/docs/modules/agents/how_to/streaming#stream-tokens

IvanShah commented 10 months ago

Actually I expect to see the same output as chain. Token by token for final output. I have used FinalStreamingStdOutCallbackHandler for that. And it's actual not only for me but for others too like here https://github.com/langchain-ai/langchain/discussions/14573 and here https://github.com/langchain-ai/langchain/discussions/12699, https://stackoverflow.com/questions/77690231/how-to-stream-the-output-of-an-agentexecutor-in-langchain-to-my-final-applicatio and some others links too. If I need to do it by myself maybe you have some example for that (for that custom runnable that implements agent executor)?

IvanShah commented 10 months ago

@eyurtsev Ok I saw this but how can I make it work for langserve?

eyurtsev commented 10 months ago

Take a look at this:

https://github.com/langchain-ai/langserve/tree/911a351f014dd2266eb49827016f786f09f0b3dd/examples/agent

server: https://github.com/langchain-ai/langserve/blob/911a351f014dd2266eb49827016f786f09f0b3dd/examples/agent/server.py#L52

and the client in stream_log will stream all the individual tokens: https://github.com/langchain-ai/langserve/blob/911a351f014dd2266eb49827016f786f09f0b3dd/examples/agent/client.ipynb


There are still 2 bugs that will block you: (1) we need to propagate configuration information to AgentExecutor and (2) fix rendering on the playground for the agent executor output

IvanShah commented 10 months ago

Ok, I see, could you please prioritise these bugs for fix in next updates?

eyurtsev commented 10 months ago

Here's an example custom agent executor that's a work around for configuration until we get that fixed in langchain:

https://github.com/langchain-ai/langserve/blob/main/examples/configurable_agent_executor/server.py#L77

Feel free to customize further based on your use case depending on what you care about in your streaming response.

Keep in mind: 1) Playground shows response of astream_log not astream 2) But astream_log just uses astream under the hood + heavy usage of callbacks which it uses to surface data from intermediate steps

IvanShah commented 10 months ago

Thank you! Should we now close the issue or wait for a fixes?

eyurtsev commented 10 months ago

What do you expect to see streamed in the playground for an agent executor?

An agent loops through:

  1. LLM invocation -- output can be streamed
  2. Tool invocation -- output usually cannot be streamed (but some tools may be streamable) (Though I don't think agent executor supports streaming tools)
  3. Tool result

And then at some point the agent yields AgentFinish and the cycle ends.

What should be shown on the playground in your opinion?

IvanShah commented 10 months ago

@eyurtsev I think the most important part is streaming LLM invocation as its setuped in LLM callback option for example (and for my case) Final Answer if we use FinalStreamingStdOutCallbackHandler() in LLM. Also it can be useful to stream Tool result.

eyurtsev commented 10 months ago

The playground can only render output from astream_log, so it won't work with custom callbacks. But wee could have the playground do something similar for showing the final answer (without developer providing a callback) -- which I think accommodates your use case :)


In the meantime:

See this comment if you want to filter astream_log on the client side yourself (e.g., with streamlit):

https://github.com/langchain-ai/langchain/discussions/15755#discussioncomment-8071748


@IvanShah Can I ask for your use case it sounds like you'd be OK we also showed intermediate tool invocations together with their results?

IvanShah commented 10 months ago

@eyurtsev Thank you for your help! I think it's absolutely OK to stream intermediate results. Actually my case is to check that all my agents actually stream correctly with different LLMs and settings:)

effusive-ai commented 9 months ago

@eyurtsev Any idea when you might get time to work on this bug? I couldn't get your custom AgentExec to work with a RunnableWithMessageHistory. Everything prints to the terminal window but nothing streams back to the JS client.

eyurtsev commented 9 months ago

We introduced a new API to help with streaming: https://python.langchain.com/docs/modules/agents/how_to/streaming#custom-streaming-with-events

It's not integrated with playground right now, so the playground will be showing the output from astream. But it will work client side. RemoteRunnable in js still doesn't have the new endpoint.

I'll try to add examples in a bit.


@effusive-ai if you are are seeing things printed out to the terminal, I am guessing that the code is relying on callbacks. Callbacks are harder to get to work, since you'll need to set up a queue between two tasks running on the backend

effusive-ai commented 9 months ago

Thanks! Yes examples would be great.

@eyurtsev I'm not using callbacks. This is the meat of what I'm doing that prints to the console but doesn't stream anything out. I assumed that was because of this bug, but maybe I'm missing an output parser somewhere? In my other chains that don't use tools, I used StrOutputParser() at the end to get the output to be sent back to the client. But that doesn't work with an agent. At least that I could get to work.

agent_llm = ChatOpenAI(
    temperature=0,
    streaming=True,
    model_name=open_ai_model
)

llm_with_tools = agent_llm.bind(tools=[convert_to_openai_tool(tool) for tool in tools])

agent = (
    RunnablePassthrough.assign(
        agent_scratchpad=lambda x: format_to_openai_tool_messages(
            x["intermediate_steps"]
        )
    )
    | agentPrompt
    | llm_with_tools
    | OpenAIToolsAgentOutputParser()
)

agent_executor = AgentExecutor(agent=agent,
                               tools=tools,
                               verbose=True,
                               handle_parsing_errors=True,
                               max_iterations=5)

class Input(BaseModel):
    human_input: str

class Output(BaseModel):
    output: Any

chain_with_history = RunnableWithMessageHistory(
    agent_executor,
    get_chat_history,
    input_messages_key="human_input",
    history_messages_key="chat_history",
).with_types(input_type=Input, output_type=Output)

add_routes(app, chain_with_history, path="/chat")
eyurtsev commented 9 months ago

I updated the client in the following two examples to show how to use event stream (checkout the client notebooks that show how to use event stream)

Both of these agents will show token by token output together with tool calls etc.

Here's an example that shows how to completely customize agent streaming by using a Runnable Lambda on top of an agent executor:


Here is sample client code together with output.

image


Playground experience is still pretty bad for agents. We'll try to prioritize this month.

eyurtsev commented 9 months ago

@eyurtsev Any idea when you might get time to work on this bug? I couldn't get your custom AgentExec to work with a RunnableWithMessageHistory. Everything prints to the terminal window but nothing streams back to the JS client.

Which of the bugs are you referring to? Are you trying to get a configurable agent?

effusive-ai commented 9 months ago

Which of the bugs are you referring to? Are you trying to get a configurable agent?

@eyurtsev Trying to get an agent to stream while using RunnableWithMessageHistory. Your agent example uses client side history. I have no problem streaming with just a chain on RunnableWithMessageHistory, but can't get an agent to stream when using server side history. Which I thought was part of the problem in this issue. Would be great if you could add an example of an agent streaming with RunnableWithMessageHistory. As for me it never returns anything.

eyurtsev commented 9 months ago

Sure! I'll add an issue to add an example! Also making sure -- are you using the astream events API?

effusive-ai commented 9 months ago

Sure! I'll add an issue to add an example! Also making sure -- are you using the astream events API?

@eyurtsev Oh, maybe that's the problem. I'm using the JS RemoteRunnable which doesn't look like it has a stream events method. Is that something planned to be added?

averypfeiffer commented 6 months ago

@eyurtsev I'm running into a related issue. When implementing your example in "agent_with_custom_streaming", everything works as expected, however I'm unable to get the Agent to invoke tools. I can see the Agent attempting to invoke a tool via its output, but in your example, if the content is blank, the output just goes into the void.

How can I take that output, invoke the tool, and give the tools output back to the agent for final output generation?

This is using the astream_events API.

FYI here is the code snippet specifically that I'm stuck on:


async for event in agent_executor.astream_events(
        {
            "input": input["content"],
            "chat_history": input["chat_history"],
        },
        version="v1",
    ):
        kind = event["event"]
        if kind == "on_chain_start":
            if (
                event["name"] == "agent"
            ):  # matches `.with_config({"run_name": "Agent"})` in agent_executor
                if STREAM_DEBUG_LOG:
                    yield "\n"
                    yield (
                        f"Starting agent: {event['name']} "
                        f"with input: {event['data'].get('input')}"
                    )
                    yield "\n"
        elif kind == "on_chain_end":
            if (
                event["name"] == "agent"
            ):  # matches `.with_config({"run_name": "Agent"})` in agent_executor
               if STREAM_DEBUG_LOG:
                    yield "\n"
                    yield (
                        f"Done agent: {event['name']} "
                        f"with output: {event['data'].get('output')['output']}"
                    )
                    yield "\n"  
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                # Empty content in the context of OpenAI means
                # that the model is asking for a tool to be invoked.
                # So we only print non-empty content
                yield content
            else:
                # need to understand how to invoke the tool, and return the output back to the agent for final output generation HERE
                print("Empty content")```