langchain-ai / langgraph

Build resilient language agents as graphs.
https://langchain-ai.github.io/langgraph/
MIT License
5.48k stars 867 forks source link

How to stream token of agent response in agent supervisor? #137

Open chatgptguru opened 6 months ago

chatgptguru commented 6 months ago

Checked other resources

Example Code

import os from langchain.agents import AgentExecutor, create_openai_tools_agent from langchain_core.messages import BaseMessage, HumanMessage from langchain_openai import ChatOpenAI from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder import operator from typing import Annotated, Any, Dict, List, Optional, Sequence, TypedDict import functools from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langgraph.graph import StateGraph, END from langchain_community.utilities import SerpAPIWrapper from langchain.agents import Tool from utils import toggle_case,sort_string

os.environ["OPENAI_API_KEY"] = "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" os.environ["SERPER_API_KEY"] = "c3b73653f4256d5f2b4b5cf4e6fa438d736de7a4717b0fe06d92df0f30fbd3ce"

class AgentSupervisor: def init(self, llm): self.llm = llm

def getAgentSupervisor():
    search = SerpAPIWrapper(serpapi_api_key=os.environ["SERPER_API_KEY"])
    tools = [
        Tool(
            name="Search",
            func=search.run,
            description="useful for when you need to answer questions about current events",
        ),
        Tool(
            name="Toogle_Case",
            func=lambda word: toggle_case(word),
            description="use when you want to convert the letter to uppercase or lowercase",
        ),
        Tool(
            name="Sort_String",
            func=lambda string: sort_string(string),
            description="use when you want to sort a string alphabetically",
        ),
    ]

    def create_agent(llm: ChatOpenAI, tools: list, system_prompt: str):
        # Each worker node will be given a name and some tools.
        prompt = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    system_prompt,
                ),
                MessagesPlaceholder(variable_name="messages"),
                MessagesPlaceholder(variable_name="agent_scratchpad"),
            ]
        )
        agent = create_openai_tools_agent(llm, tools, prompt)
        executor = AgentExecutor(agent=agent, tools=tools)
        return executor

    def agent_node(state, agent, name):
        result = agent.invoke(state)
        return {"messages": [HumanMessage(content=result["output"], name=name)]}

    members = ["AIAssistant", "Coder"]
    system_prompt = (
        "You are a supervisor tasked with managing a conversation between the"
        " following workers:  {members}. Given the following user request,"
        " respond with the worker to act next. Each worker will perform a"
        " task and respond with their results and status. When finished,"
        " respond with FINISH."
    )
    # Our team supervisor is an LLM node. It just picks the next agent to process
    # and decides when the work is completed
    options = ["FINISH"] + members
    # Using openai function calling can make output parsing easier for us
    function_def = {
        "name": "route",
        "description": "Select the next role.",
        "parameters": {
            "title": "routeSchema",
            "type": "object",
            "properties": {
                "next": {
                    "title": "Next",
                    "anyOf": [
                        {"enum": options},
                    ],
                }
            },
            "required": ["next"],
        },
    }
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system_prompt),
            MessagesPlaceholder(variable_name="messages"),
            (
                "system",
                "Given the conversation above, who should act next?"
                " Or should we FINISH? Select one of: {options}",
            ),
        ]
    ).partial(options=str(options), members=", ".join(members))

    llm = ChatOpenAI(model="gpt-4-1106-preview", streaming=True)

    supervisor_chain = (
        prompt
        | llm.bind_functions(functions=[function_def], function_call="route")
        | JsonOutputFunctionsParser()
    )

    class AgentState(TypedDict):
        # The annotation tells the graph that new messages will always
        # be added to the current states
        messages: Annotated[Sequence[BaseMessage], operator.add]
        # The 'next' field indicates where to route to next
        next: str

    research_agent = create_agent(llm, tools, "You are a ai assistant to provide personalized answer to people.")
    research_node = functools.partial(agent_node, agent=research_agent, name="AIAssistant")

    # NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION. PROCEED WITH CAUTION
    code_agent = create_agent(
        llm,
        tools,
        "You may generate safe python code to analyze data and generate charts using matplotlib.",
    )
    code_node = functools.partial(agent_node, agent=code_agent, name="Coder")

    workflow = StateGraph(AgentState)
    workflow.add_node("AIAssistant", research_node)
    workflow.add_node("Coder", code_node)
    workflow.add_node("supervisor", supervisor_chain)

    for member in members:
        # We want our workers to ALWAYS "report back" to the supervisor when done
        workflow.add_edge(member, "supervisor")
    # The supervisor populates the "next" field in the graph state
    # which routes to a node or finishes
    conditional_map = {k: k for k in members}
    conditional_map["FINISH"] = END
    workflow.add_conditional_edges("supervisor", lambda x: x["next"], conditional_map)
    # Finally, add entrypoint
    workflow.set_entry_point("supervisor")

    graph = workflow.compile()
    return graph

agent_supervisor = AgentSupervisor.getAgentSupervisor() agent_name = '' for s in agent_supervisor.stream( { "messages": [ HumanMessage(content=question) ] }, { "recursion_limit": 100 } ): if "end" not in s: if 'supervisor' in s: agent_name = s['supervisor']['next'] if agent_name != "FINISH": await websocket.send_text(json.dumps({"token":"AgentName:"+agent_name+"\n"})) print(agent_name) if agent_name in s: content = s[agent_name]['messages'][0].content await websocket.send_text(json.dumps({"token":"Response:"+content+"\n"})) print(content) print("----")

Error Message and Stack Trace (if applicable)

No Error, it is outputing properly, but I need a way to stream tokens of agent response, it is outputing full agent response now.

Description

I am trying to stream tokens of agent response in agent super visor. Right now, it is outputing agent name and full agent response, Here I want to stream tokens of agent response.

System Info

platform: windows python version: 3.11.2 langchain version: latest version

sploithunter commented 6 months ago

It should be astream_log() but it is broken in some agents. I have an issue filed for the break.

sploithunter commented 6 months ago

try this:

graph = workflow.compile()

async def main():
   async for output in graph.astream_log(
        {
            "messages": [
                HumanMessage(content="Code hello world and print it to the terminal")
            ]
        }, include_types=["llm"]
    ):
        for op in output.ops:
            if op["path"] == "/streamed_output/-":
                # this is the output from .stream()
                ...
            elif op["path"].startswith("/logs/") and op["path"].endswith(
                "/streamed_output/-"
            ):
                # because we chose to only include LLMs, these are LLM tokens
                print(op["value"])
if __name__ == "__main__":
    import asyncio
    asyncio.run(main())
chatgptguru commented 6 months ago

Thank you for your reply, but I am facing this issue. And also can you please advice how to add converational memory there after we fix streaming issue? image

sploithunter commented 6 months ago

That is the issue I have filed. It is not fixed yet. It appears in many graphs, but not all.

chatgptguru commented 6 months ago

Thank you for your response, have you ever implemented conversational memory in agent supervisor?

Sulomus commented 5 months ago

I have faced the same issue this week - any luck guys?

The streaming doesnt seem to work at all in the langgraph library :(

chatgptguru commented 5 months ago

Not yet. (

Message ID: @.***>

dmitryrPlanner5D commented 5 months ago

It seems I have figured it out how to fix tokens streaming.

I am not sure about your code, because it is not async, but I was using this notebook and the graph was not streaming tokens as expected. After debugging internals of langgraph, I have figured out that you need to create extra parameter for your llm calling function and pass it to llm:

# Define the function that calls the model
async def call_model(messages, config):
    response = await model.ainvoke(messages, config=config)
    # We return a list, because this will get added to the existing list
    return response

That way langgraph is able to pass callbacks to the llm to handle its stream and I got my tokens stream.

Again, I am not sure about your code, because tutorials suggest to use graph.astream_events rather than graph.stream

unfailingsalvage1448 commented 5 months ago

It seems I have figured it out how to fix tokens streaming.

I am not sure about your code, because it is not async, but I was using this notebook and the graph was not streaming tokens as expected. After debugging internals of langgraph, I have figured out that you need to create extra parameter for your llm calling function and pass it to llm:

# Define the function that calls the model
async def call_model(messages, config):
    response = await model.ainvoke(messages, config=config)
    # We return a list, because this will get added to the existing list
    return response

That way langgraph is able to pass callbacks to the llm to handle its stream and I got my tokens stream.

Again, I am not sure about your code, because tutorials suggest to use graph.astream_events rather than graph.stream

Thanks this worked for me with AzureChatOpenAI

PlebeiusGaragicus commented 4 months ago

@dmitryrPlanner5D - I spent a few hours trying to solve this and you figured it out for me! Passing the config to the LLM enables chunk streaming inside graphs!!! Wonderful!!!

If this were nostr I would zap you!

usersina commented 4 months ago

I have the same problem with this setup

# List of members participating in the conversation
members = ["MongoDBAgent"]

# Setup the nodes
supervisor_node = supervisor_factory.setup_node(members)
mongo_node = mongo_factory.setup_node(config)

# Setup the graph and add the nodes
workflow = StateGraph(AgentState)
workflow.add_node("Supervisor", supervisor_node)
workflow.add_node("MongoDBAgent", mongo_node)

# Define the edges
for member in members:
    workflow.add_edge(member, "Supervisor")

# The supervisor populates the "next" state, hence routing the conversation
# A conditional map is a dict that maps the output of the supervisor to the next node
# e.g. {'MongoDBAgent': 'MongoDBAgent', 'FINISH': END}
conditional_map = {member: member for member in members}
conditional_map["FINISH"] = END
print(conditional_map)

workflow.add_conditional_edges("Supervisor", lambda state: state["next"], conditional_map)

# Entry point
workflow.set_entry_point("Supervisor")

# Compile
graph = workflow.compile()

with the following factory method for MongoDB

def create_agent(
    llm: ChatOpenAI, tools: Sequence[BaseTool], system_prompt: str
) -> AgentExecutor:
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                system_prompt,
            ),
            MessagesPlaceholder(variable_name="messages"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ]
    )
    agent = create_openai_tools_agent(llm, tools, prompt)
    executor = AgentExecutor(agent=agent, tools=tools)  # type: ignore https://github.com/langchain-ai/langchain/issues/13075
    return executor

def setup_agent(config: Config) -> AgentExecutor:
    llm = ChatOpenAI(model=config.model, streaming=config.streaming)
    return create_agent(llm, _get_tools(config), system_message)

def setup_node(config: Config) -> functools.partial[dict[str, list[HumanMessage]]]:
    mongo_agent = setup_agent(config)
    mongo_node = functools.partial(agent_node, agent=mongo_agent, name=NODE_NAME)

    return mongo_node

Calling my graph with

async for output in graph.astream_log(inputs, include_types=["llm"]):
    # astream_log() yields the requested logs (here LLMs) in JSONPatch format
    for op in output.ops:
        if op["path"] == "/streamed_output/-":
            # this is the output from .stream()
            ...
        elif op["path"].startswith("/logs/") and op["path"].endswith(
            "/streamed_output/-"
        ):
            # because we chose to only include LLMs, these are LLM tokens
            print(op["value"])

Gives the following error

KeyError: 'MongoMongoDBMongoDBAgent'

I can't see where I can add the config

It seems I have figured it out how to fix tokens streaming.

I am not sure about your code, because it is not async, but I was using this notebook and the graph was not streaming tokens as expected. After debugging internals of langgraph, I have figured out that you need to create extra parameter for your llm calling function and pass it to llm:

# Define the function that calls the model
async def call_model(messages, config):
    response = await model.ainvoke(messages, config=config)
    # We return a list, because this will get added to the existing list
    return response

That way langgraph is able to pass callbacks to the llm to handle its stream and I got my tokens stream.

Again, I am not sure about your code, because tutorials suggest to use graph.astream_events rather than graph.stream

dmitryrPlanner5D commented 4 months ago

Hi, @usersina Regarding your KeyError, it seems to be connected to MongoDB setup rather than LangGraph. I am not familiar with Mongo, so can not help.

And regarding config: in my solution, I suggest to add config parameter to function that you pass to workflow.add_node. In your case it is the function produced by mongo_factory.setup_node(config). So after your function building (and after all functools.partial) it should have only 2 parameters: graph_state and config. And this config you can pass to llm.invoke to enable streaming.

Also I see that you already have parameter with name config for different purposes, so be careful to not confuse it with LangChain config I suggest to use.

usersina commented 4 months ago

@dmitryrPlanner5D , thanks for the suggestion!

The MongoDB setup is just a simple tool to know the names of the collections in a mongodb database, so nothing crazy. I did however follow what you said and streaming is working now. However, I still see the KeyError executing:

inputs = {"messages": [HumanMessage(content="How many collections do we have?")]}
async for output in graph.astream_log(inputs, include_types=["llm"]):
    for op in output.ops:
        print(op)

{'op': 'replace', 'path': '', 'value': {'id': '1e88305f-e06c-4f9f-8120-20208defbb88', 'streamed_output': [], 'final_output': None, 'logs': {}, 'name': 'LangGraph', 'type': 'chain'}} {'op': 'add', 'path': '/logs/ChatOpenAI', 'value': {'id': '69000165-10d9-4828-962d-653e95bbc436', 'name': 'ChatOpenAI', 'type': 'llm', 'tags': ['seq:step:2'], 'metadata': {}, 'start_time': '2024-04-23T13:54:01.044+00:00', 'streamed_output': [], 'streamed_output_str': [], 'final_output': None, 'end_time': None}} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '', 'name': 'route'}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '{"', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'next', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '":"', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'Mongo', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'DB', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'Agent', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '"}', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='')} {'op': 'add', 'path': '/logs/ChatOpenAI/final_output', 'value': {'generations': [[{'text': '', 'generation_info': {'finish_reason': 'stop'}, 'type': 'ChatGenerationChunk', 'message': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '{"next":"MongoDBAgent"}', 'name': 'route'}}, response_metadata={'finish_reason': 'stop'}, id='run-69000165-10d9-4828-962d-653e95bbc436')}]], 'llm_output': None, 'run': None}} {'op': 'add', 'path': '/logs/ChatOpenAI/end_time', 'value': '2024-04-23T13:54:02.193+00:00'} {'op': 'add', 'path': '/streamed_output/-', 'value': {'Supervisor': {'next': 'MongoMongoDBMongoDBAgent'}}} {'op': 'replace', 'path': '/final_output', 'value': {'Supervisor': {'next': 'MongoMongoDBMongoDBAgent'}}}

The reason there is a KeyError is because the graph is trying to look for the next node to run, which in my case is named MongoDBAgent. But, if you take a look at the logs, it's trying to run "MongoMongoDBMongoDBAgent" which doesn't make sense.

This is most definitely because my AgentState looks like this:

class AgentState(TypedDict):
    """
    The agent state is the input to each node in the graph
    """

    messages: Annotated[Sequence[BaseMessage], operator.add]
    """
    The annotation tells the graph that new messages will always be added
    to the current state
    """

    next: str
    """
    The next node to execute
    """

The culprit, I think is the operator.add in the Annotated messages. I did see a similar issue that I will be diving into.

usersina commented 4 months ago

I found the issue and the solution.