langchain-ai / langgraph-studio

Desktop app for prototyping and debugging LangGraph applications locally.
https://studio.langchain.com
1.96k stars 131 forks source link

How to implement human in the loop with LangGraph Studio? #20

Open Nachoeigu opened 3 months ago

Nachoeigu commented 3 months ago

Hi! I was testing out the new feature.

I don´t know if it is possible yet. But I would like to know how to integrate human in loop directly in LangGraph Studio.

The node with the human in loop logic:

` def read_human_feedback(state: MessagesState): if state['messages'][-1].tool_calls == []: print("AI: \n"+ state['messages'][-1].content) user_msg = input("Reply: ") return {'messages': [HumanMessage(content = user_msg)]} else: pass

def should_continue(state: MessagesState) -> Literal["agent", "end"]: messages = state['messages'] last_message = messages[-1] if isinstance(last_message, HumanMessage): return "agent" return "end"

workflow.add_conditional_edges( "human_feedback", should_continue, {"agent": 'agent', "end": END} ) `

It works if I run my python file, but in LangGraph Studio, it raises an EOF error because of the input function. I think it is because the UI doesn´t support that way for interacting with the code.

Thank you!

HasnainKhanNiazi commented 3 months ago

I am also working on the similar problem. I have an agent who ask follow-up question from a user using a tool and on CLI, it works fine but in studio, I am getting this error "EOFError: EOF when reading a line"

Nachoeigu commented 3 months ago

@vbarda Thank you for your suggestion. I also tried this approach but it is inefficient.

I mean, the interrupts is mainly a functionality for debugging your app (like VS Code Debugger, see how it works step by step).

If I need to remove my human_feedback node for having the end user feedback, I think it would be inefficient because of the need to adjust my code only for the visualization in the IDE.

I think the human in loop feature is not able yet in the LangGraph Studio but if someone knows, let me know :)

vbarda commented 3 months ago

@Nachoeigu you would still need to modify the node implementation because input() is not going to be possible to use with the LangGraph API server that's used with LangGraph Studio

however, you can actually achieve the same behavior in the following way:

(1) update your code to something like this

# this is basically a no-op node
def human_feedback(state):
    pass

def should_continue(state):
    messages = state['messages']
    last_message = messages[-1]
    if isinstance(last_message, HumanMessage):
        return "agent"
    return "end"

workflow.set_entry_point("agent")
workflow.add_node("agent", call_model)
workflow.add_node("human", human_feedback)
workflow.add_edge("agent", "human")
workflow.add_conditional_edges(
    "human",
    should_continue,
    {
        "agent": "agent",
        "end": END,
    },
)

(2) add an interrupt to the human node in the Studio UI

when the graph stops, head over to the Inputs section on the bottom left and submit the HumanMessage. This would update the state in a similar way to what you were doing before

image

hope this helps! we're also going to add a video to README on this

Nachoeigu commented 3 months ago

@vbarda It seems not to work if I pass the function. But I tried with this:

def read_human_feedback(state: MessagesState):
    # if state['messages'][-1].tool_calls == []:
    #     logger.info("AI: \n"+ state['messages'][-1].content)
    #     user_msg = input("Reply: ")
    #     return {'messages': [HumanMessage(content = user_msg)]}
    # else:
    #     pass
    return {'messages':[HumanMessage(content='')]}

But also, it didnt work with LangGraph API. I recorded my screen for clarification: It generates a fork but in the fork I can not continue in spite of the relationship between nodes should continue:

https://github.com/user-attachments/assets/7cc17c29-ced6-4539-bcad-417301bf29f8

These are the relationships:

workflow = StateGraph(MessagesState)

workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
workflow.add_node("human_feedback", read_human_feedback)

workflow.set_entry_point("agent")

workflow.add_conditional_edges(
    "agent",
    should_continue,
    {"human_feedback": 'human_feedback',
     "tools": "tools"}
)

workflow.add_conditional_edges(
    "human_feedback",
    should_continue_with_feedback,
    {"agent": 'agent',
     "end": END}
)

workflow.add_edge("tools", 'agent')
workflow.add_edge("agent", 'human_feedback')

checkpointer = MemorySaver()

app = workflow.compile(checkpointer=checkpointer)
hwchase17 commented 3 months ago

Why do you have workflow.add_edge("agent", 'human_feedback')? Is this needed? Dont you already have the conditional edge which goes from agent to either human_feedback or tools?

Nachoeigu commented 3 months ago

Why do you have workflow.add_edge("agent", 'human_feedback')? Is this needed? Dont you already have the conditional edge which goes from agent to either human_feedback or tools?

Yes, adjusted. It wasn't needed. :)

About, what I mentioned, I think it is not yet implemented or maybe a bug. But if you fork a message, you can not continue the flow as I provided in the video. (or maybe I m wrong, I don´t know).

You see, it should go to the agent as it highlights in the main branch:

image

But, if I forks it. I can not continue:

image

For better clarification, I will provide the agent.py file that feeds the UI:


import os
from dotenv import load_dotenv
import sys

load_dotenv()
WORKDIR=os.getenv("WORKDIR")
os.chdir(WORKDIR)
sys.path.append(WORKDIR)

from typing import Annotated, Literal, TypedDict
import json
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langchain_google_genai.chat_models import ChatGoogleGenerativeAI
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph, MessagesState
from langgraph.prebuilt import ToolNode
from typing import TypedDict, Annotated, List, Dict
from langchain_core.messages import AnyMessage, HumanMessage, AIMessage, ToolMessage
import operator
from src.vector_database.utils import PineconeManagment
import logging
import logging_config

logger = logging.getLogger(__name__)

#This is for the RAG phase of the app
def format_retrieved_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

pinecone_conn = PineconeManagment()
pinecone_conn.loading_vdb(index_name = 'ovidedentalclinic')

retriever = pinecone_conn.vdb.as_retriever(search_type="similarity", 
                                    search_kwargs={"k": 2})

rag_chain = retriever | format_retrieved_docs

class MessagesState(TypedDict):
    messages: Annotated[List[AnyMessage], operator.add]

#All the tools to consider
@tool
def check_availability(desired_date:str, specialization:str):
    """Checking the database if the doctor has availability"""
    return True

@tool
def reschedule_appointment(old_date:str, new_date:str, dni_number:int, doctor_name:str):
    """Rescheduling an appointment"""
    return True

@tool
def cancel_appointment(date:str, dni_number:int, doctor_name:str):
    """Canceling an appointment"""
    return True

@tool
def get_catalog_specialists():
    """Obtain information about the doctors and specializations/services we provide"""
    with open(f"{WORKDIR}/data/catalog.json","r") as file:
        file = json.loads(file.read())

    return file

@tool
def set_appointment(date:str, dni_number:int, specialization:str):
    """Set appointment with the doctor"""
    return True

@tool
def check_results(dni_number:int):
    """Check if the result of the pacient is available"""
    return True

@tool
def reminder_appointment(dni_number:int):
    """Returns when the pacient has its appointment with the doctor"""
    return "You have for next monday at 7 am"

@tool
def retrieve_faq_info(question:str):
    """Retrieve documents from general questions about the medical clinic"""
    return rag_chain.invoke(question)

tools = [cancel_appointment, get_catalog_specialists, retrieve_faq_info, set_appointment, reminder_appointment, check_availability, check_results,reschedule_appointment, reschedule_appointment]

tool_node = ToolNode(tools)

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
#model = ChatGoogleGenerativeAI(model = 'gemini-1.5-pro-exp-0801', temperature = 0)
model = model.bind_tools(tools = tools)

from datetime import datetime

def should_continue(state: MessagesState) -> Literal["tools", "human_feedback"]:
    messages = state['messages']
    last_message = messages[-1]
    if last_message.tool_calls:
        return "tools"
    return "human_feedback"

def should_continue_with_feedback(state: MessagesState) -> Literal["agent", "end"]:
    messages = state['messages']
    last_message = messages[-1]
    if isinstance(last_message, HumanMessage):
        return "agent"
    return "end"

def call_model(state: MessagesState):
    messages = state['messages']

    response = model.invoke(messages)
    return {"messages": [response]}

#The commented part is because it breaks the UI with the input function
def read_human_feedback(state: MessagesState):
    # if state['messages'][-1].tool_calls == []:
    #     logger.info("AI: \n"+ state['messages'][-1].content)
    #     user_msg = input("Reply: ")
    #     return {'messages': [HumanMessage(content = user_msg)]}
    # else:
    #     pass
    return {'messages':[HumanMessage(content='')]}

workflow = StateGraph(MessagesState)

workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
workflow.add_node("human_feedback", read_human_feedback)

workflow.set_entry_point("agent")

workflow.add_conditional_edges(
    "agent",
    should_continue,
    {"human_feedback": 'human_feedback',
     "tools": "tools"}
)

workflow.add_conditional_edges(
    "human_feedback",
    should_continue_with_feedback,
    {"agent": 'agent',
     "end": END}
)

workflow.add_edge("tools", 'agent')

checkpointer = MemorySaver()

app = workflow.compile(checkpointer=checkpointer)

Thank you @hwchase17 for the amazing job you and the team are making for the community. Keep it going!

hwchase17 commented 3 months ago

hmmm @Nachoeigu how did you fork it? would you be able to record a video?

Nachoeigu commented 3 months ago

hmmm @Nachoeigu how did you fork it? would you be able to record a video?

@hwchase17 Here it is:

https://github.com/user-attachments/assets/7c8479fa-4010-4464-8c77-4a7ed0c371eb

PeterP22 commented 3 weeks ago

any update on fixing human in the loop for langgraph cloud?

HockeyTrafalgar commented 3 weeks ago

Actually, have similar problem. If in LangGraph Studio I interrupt the flow before or after the HumanFeedback node, then add a HumanMessage in the Input section in the Studio, the state does not get updated and the message that I've just entered gets lost. The only workaround I found so far is to set interrupt AFTER the wait_for_client node and edit the message that it has returned, creating a fork. This way agent gets the feedback and goes on with the execution.

At the same time if I run the graph from the Python code without the Studio, and add messages to the input of graph.stream, it works just fine:

from react_agent.graph import graph, workflow
import asyncio
from langchain_core.messages import HumanMessage
import sqlite3
from langgraph.checkpoint.sqlite import SqliteSaver

async def run_graph():
    sqliteConn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)
    memory = SqliteSaver(sqliteConn)

    graph = workflow.compile(
        checkpointer=memory,
        interrupt_before=['wait_for_client'],
    )

    # Save image
    with open("graph.png", "wb") as f:
        f.write(graph.get_graph().draw_mermaid_png())

    config = {"configurable": {"thread_id": "28"}}

    # print history
    state = graph.get_state(config=config)
    if (state.values):
        messages = state.values["messages"]
        for message in messages:
            message.pretty_print()

    while True:
        state = graph.get_state(config=config)
        print(f"Next node: {state.next}")

        user_input = input("Next message: ")
        if (user_input):
            state_update = {"messages": [HumanMessage(user_input)]}
        else:
            state_update = None

        events = graph.stream(input=state_update,config=config, stream_mode="values",debug=False)

        for event in events:
            event["messages"][-1].pretty_print()

asyncio.run(run_graph())

graph

Graph code:

"""Define a custom Reasoning and Action agent.

Works with a chat model with tool calling support.
"""

from datetime import datetime, timezone
from typing import Dict, List, Literal, cast

from langchain_core.messages import AIMessage, SystemMessage,HumanMessage, ToolMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph
from langgraph.prebuilt import ToolNode

from react_agent.configuration import Configuration
from react_agent.state import InputState, State
from react_agent.tools import TOOLS
from react_agent.utils import load_chat_model

import logging
logger = logging.getLogger(__name__)

def call_model(
    state: State, config: RunnableConfig
) -> Dict[str, List[AIMessage]]:
    """Call the LLM powering our "agent".

    This function prepares the prompt, initializes the model, and processes the response.

    Args:
        state (State): The current state of the conversation.
        config (RunnableConfig): Configuration for the model run.

    Returns:
        dict: A dictionary containing the model's response message.
    """

    configuration = Configuration.from_runnable_config(config)

    # Create a prompt template. Customize this to change the agent's behavior.
    prompt = ChatPromptTemplate.from_messages(
        [("system", configuration.system_prompt), ("placeholder", "{messages}")]
    )

    # Initialize the model with tool binding. Change the model or add more tools here.
    model = load_chat_model(configuration.model).bind_tools(TOOLS)

    # Prepare the input for the model, including the current system time
    message_value = prompt.invoke(
        {
            "messages": state.messages,
            "system_time": datetime.now(tz=timezone.utc).isoformat(),
        },
        config,
    )

    # Get the model's response
    response = cast(AIMessage, model.invoke(message_value, config))

    # Handle the case when it's the last step and the model still wants to use a tool
    if state.is_last_step and response.tool_calls:
        return {
            "messages": [
                AIMessage(
                    id=response.id,
                    content="Sorry, I could not find an answer to your question in the specified number of steps.",
                )
            ]
        }

    # Return the model's response as a list to be added to existing messages
    return {"messages": [response]}

def initialize_agent(
    state: State, config: RunnableConfig
) -> Dict[str, List[AIMessage]]:
    return {"messages": [HumanMessage(content="-")]}

def wait_for_client(
    state: State, config: RunnableConfig
):
   print(f"Waiting for client input: {state}")
   return {"messages": [HumanMessage(content="ENTER_YOUR_RESPONSE_HERE")]}

# Define a new graph

workflow = StateGraph(State, input=InputState, config_schema=Configuration)

# Define the two nodes we will cycle between
workflow.add_node(call_model)
workflow.add_node(initialize_agent)
workflow.add_node(wait_for_client)
workflow.add_node("tools", ToolNode(TOOLS))

# Set the entrypoint as `call_model`
# This means that this node is the first one called
workflow.add_edge("__start__", "initialize_agent")
workflow.add_edge("initialize_agent", "call_model")
workflow.add_edge("wait_for_client", "call_model")

def route_model_output(state: State) -> Literal["__end__", "tools"]:
    """Determine the next node based on the model's output.

    This function checks if the model's last message contains tool calls.

    Args:
        state (State): The current state of the conversation.

    Returns:
        str: The name of the next node to call ("__end__" or "tools").
    """
    last_message = state.messages[-1]
    if not isinstance(last_message, AIMessage):
        raise ValueError(
            f"Expected AIMessage in output edges, but got {type(last_message).__name__}"
        )
    # If there is no tool call, then we finish
    if not last_message.tool_calls:
        return "__end__"
    # Otherwise we execute the requested actions

    return "tools"

def route_tool_output(state: State) -> Literal["call_model", "wait_for_client"]:

    last_message = state.messages[-1]

    if (last_message.content == "waiting_for_client"):
        return "wait_for_client"
    else:
        return "call_model"

# Add a conditional edge to determine the next step after `call_model`
workflow.add_conditional_edges(
    "call_model",
    # After call_model finishes running, the next node(s) are scheduled
    # based on the output from route_model_output
    route_model_output,
)

workflow.add_conditional_edges(
    "tools",
    route_tool_output,
)

# Compile the workflow into an executable graph
# You can customize this by adding interrupt points for state updates
graph = workflow.compile(
    interrupt_before=[],  # Add node names here to update state before they're called
    interrupt_after=['wait_for_client'],  # Add node names here to update state after they're called
)
graph.name = "Agent"  # This customizes the name in LangSmith

Tools definitions:

from typing import Any, Callable, List, Optional, cast
from langchain_core.messages import ToolMessage
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import InjectedToolArg
from typing_extensions import Annotated

from react_agent.configuration import Configuration

from langchain_community.tools.file_management import (
    ReadFileTool,
    WriteFileTool,
    ListDirectoryTool,
)
import langchain_experimental
from langchain_community.tools.shell import ShellTool

def search(
    query: str, *, config: Annotated[RunnableConfig, InjectedToolArg]
) -> Optional[list[dict[str, Any]]]:
    """Search for general web results.

    This function performs a search using the Tavily search engine, which is designed
    to provide comprehensive, accurate, and trusted results. It's particularly useful
    for answering questions about current events.
    """
    configuration = Configuration.from_runnable_config(config)
    wrapped = TavilySearchResults(max_results=configuration.max_search_results)
    result = wrapped.invoke({"query": query})
    return cast(list[dict[str, Any]], result)

def ask_client(
    query: str, *, config: Annotated[RunnableConfig, InjectedToolArg]
) -> str:
    """Ask THE CLIENT for input when additional information is required"""
    print("ask_client")
    return "waiting_for_client"

TOOLS: List[Callable[..., Any]] = [
    search,
    ReadFileTool(),
    WriteFileTool(),
    ListDirectoryTool(),
    ShellTool(),
    ask_client
]