Hi, I was trying to implement human in loop concept but unable to implement. I follow the documentation but not getting desired output. I have attaching code also for review. please guide me how to implement human in loop such a way that graph flow will stop desired node then human will see the output of that node, it will update if required and then send information to next node. Next node will use that information and do remaining code,
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from IPython.display import Image, display
class State(TypedDict):
input: str
def step_1(state):
print("---Step 1---")
state['input'] += " from step 1"
return state
def step_2(state):
print("---Step 2---")
state['input'] += " from step 2"
return state
def step_3(state):
print("---Step 3---")
state['input'] += " from step 3"
return state
current_state = initial_input
for event in graph.stream(current_state, thread, stream_mode="values"):
print(event)
current_state = event # Update the current state with the event output
Capture the output of step_2
state_after_step_2 = current_state
try:
user_edit = input(f"Edit the state after Step 2: {state_after_step_2['input']}\nYour edit: ")
except:
user_edit = state_after_step_2['input']
Update the state with the user's edit
state_after_step_2['input'] = user_edit
Continue the graph execution with the updated state
for event in graph.stream(state_after_step_2, thread, stream_mode="values"):
print(event)
current_state = event # Update the current state with the event output
for event in graph.stream(None, thread, stream_mode="values"):
print(event)
current_state = event # Update the current state with the event output
Idea or request for content:
Please provide me doc, solution or blog where i can solve my issues.
Issue with current documentation:
Hi, I was trying to implement human in loop concept but unable to implement. I follow the documentation but not getting desired output. I have attaching code also for review. please guide me how to implement human in loop such a way that graph flow will stop desired node then human will see the output of that node, it will update if required and then send information to next node. Next node will use that information and do remaining code,
from typing import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import MemorySaver from IPython.display import Image, display
class State(TypedDict): input: str
def step_1(state): print("---Step 1---") state['input'] += " from step 1" return state
def step_2(state): print("---Step 2---") state['input'] += " from step 2" return state
def step_3(state): print("---Step 3---") state['input'] += " from step 3" return state
builder = StateGraph(State) builder.add_node("step_1", step_1) builder.add_node("step_2", step_2) builder.add_node("step_3", step_3) builder.add_edge(START, "step_1") builder.add_edge("step_1", "step_2") builder.add_edge("step_2", "step_3") builder.add_edge("step_3", END)
Set up memory
memory = MemorySaver()
Add
graph = builder.compile(checkpointer=memory, interrupt_before=["step_3"])
View
display(Image(graph.get_graph().draw_mermaid_png()))
Input
initial_input = {"input": "hello world"}
Thread
thread = {"configurable": {"thread_id": "1"}}
Run the graph until the first interruption
current_state = initial_input for event in graph.stream(current_state, thread, stream_mode="values"): print(event) current_state = event # Update the current state with the event output
Capture the output of step_2
state_after_step_2 = current_state
try: user_edit = input(f"Edit the state after Step 2: {state_after_step_2['input']}\nYour edit: ") except: user_edit = state_after_step_2['input']
Update the state with the user's edit
state_after_step_2['input'] = user_edit
Continue the graph execution with the updated state
for event in graph.stream(state_after_step_2, thread, stream_mode="values"): print(event) current_state = event # Update the current state with the event output
for event in graph.stream(None, thread, stream_mode="values"):
print(event)
current_state = event # Update the current state with the event output
Idea or request for content:
Please provide me doc, solution or blog where i can solve my issues.