Closed RobinQu closed 1 month ago
langgraph examples: https://github.com/langchain-ai/langgraph/tree/main/examples
def human_approval(tool_invocations: list) -> Runnable:
tool_strs = "\n\n".join(
json.dumps(tool_call, indent=2) for tool_call in tool_invocations
)
msg = (
f"Do you approve of the following tool invocations\n\n{tool_strs}\n\n"
"Anything except 'Y'/'Yes' (case-insensitive) will be treated as a no."
)
resp = input(msg)
if resp.lower() not in ("yes", "y"):
raise ValueError(f"Tool invocations not approved:\n\n{tool_strs}")
return tool_invocations
chain = model | JsonOutputToolsParser() | human_approval | call_tool_list
chain.invoke("how many emails did i get in the last 5 days?")
interrupt before action
node in graph
https://github.com/langchain-ai/langgraph/blob/main/examples/human-in-the-loop.ipynb
# intitialize
app = workflow.compile(checkpointer=memory, interrupt_before=["action"])
...
# to resume
app.stream(None, thread)
https://arxiv.org/abs/2308.03188 https://github.com/langchain-ai/langgraph/blob/main/examples/reflection/reflection.ipynb?ref=blog.langchain.dev
the process of prompting an LLM to observe its past steps (along with potential observations from tools/the environment) to assess the quality of the chosen actions
This is then used downstream for things like re-planning, search, or evaluation.
Generateor prompt:
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are an essay assistant tasked with writing excellent 5-paragraph essays."
" Generate the best essay possible for the user's request."
" If the user provides critique, respond with a revised version of your previous attempts.",
),
MessagesPlaceholder(variable_name="messages"),
]
)
Grader prompt:
reflection_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a teacher grading an essay submission. Generate critique and recommendations for the user's submission."
" Provide detailed recommendations, including requests for length, depth, style, etc.",
),
MessagesPlaceholder(variable_name="messages"),
]
)
from typing import List, Sequence
from langgraph.graph import END, MessageGraph
async def generation_node(state: Sequence[BaseMessage]):
return await generate.ainvoke({"messages": state})
async def reflection_node(messages: Sequence[BaseMessage]) -> List[BaseMessage]:
# Other messages we need to adjust
cls_map = {"ai": HumanMessage, "human": AIMessage}
# First message is the original user request. We hold it the same for all nodes
translated = [messages[0]] + [
cls_map[msg.type](content=msg.content) for msg in messages[1:]
]
res = await reflect.ainvoke({"messages": translated})
# We treat the output of this as human feedback for the generator
return HumanMessage(content=res.content)
builder = MessageGraph()
builder.add_node("generate", generation_node)
builder.add_node("reflect", reflection_node)
builder.set_entry_point("generate")
def should_continue(state: List[BaseMessage]):
if len(state) > 6:
# End after 3 iterations
return END
return "reflect"
builder.add_conditional_edges("generate", should_continue)
builder.add_edge("reflect", "generate")
graph = builder.compile()
https://arxiv.org/abs/2303.11366 https://github.com/langchain-ai/langgraph/blob/main/examples/reflexion/reflexion.ipynb
The agent explicitly critiques its responses for tasks to generate a higher quality final response, at the expense of longer execution time.
The main component of Reflexion is the "actor", which is an agent that reflects on its response and re-executes to improve based on self-critique.
actor prompt
actor_prompt_template = ChatPromptTemplate.from_messages(
[
(
"system",
"""You are expert researcher.
Current time: {time}
1. {first_instruction}
2. Reflect and critique your answer. Be severe to maximize improvement.
3. Recommend search queries to research information and improve your answer.""",
),
MessagesPlaceholder(variable_name="messages"),
("system", "Answer the user's question above using the required format."),
]
).partial(
time=lambda: datetime.datetime.now().isoformat(),
)
Revisor prompt
revise_instructions = """Revise your previous answer using the new information.
- You should use the previous critique to add important information to your answer.
- You MUST include numerical citations in your revised answer to ensure it can be verified.
- Add a "References" section to the bottom of your answer (which does not count towards the word limit). In form of:
- [1] https://example.com
- [2] https://example.com
- You should use the previous critique to remove superfluous information from your answer and make SURE it is not more than 250 words.
"""
Limitations:
This agent trades off execution time for quality. It explicitly forces the agent to critique and revise the output over several steps, which usually (not always) increases the response quality but takes much longer to return a final answer The 'reflections' can be paired with additional external feedback (such as validators), to further guide the actor. In the paper, 1 environment (AlfWorld) uses external memory. It does this by storing summaries of the reflections to an external store and using them in subsequent trials/invocations.
https://arxiv.org/abs/2305.04091 https://github.com/langchain-ai/langgraph/blob/main/examples/plan-and-execute/plan-and-execute.ipynb?ref=blog.langchain.dev
This compares to a typical ReAct style agent where you think one step at a time. The advantages of this "plan-and-execute" style agent are: Explicit long term planning (which even really strong LLMs can struggle with) Ability to use smaller/weaker models for the execution step, only using larger/better models for the planning step
Components
Planer prompt
planner_prompt = ChatPromptTemplate.from_template(
"""For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.
{objective}"""
)
replaner prompt
replanner_prompt = ChatPromptTemplate.from_template(
"""For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.
Your objective was this:
{input}
Your original plan was this:
{plan}
You have currently done the follow steps:
{past_steps}
Update your plan accordingly. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan."""
)
One known limitations of the above design is that each task is still executed in sequence, meaning embarassingly parallel operations all add to the total execution time. You could improve on this by having each task represented as a DAG (similar to LLMCompiler), rather than a regular list.
https://arxiv.org/abs/2312.04511 https://github.com/langchain-ai/langgraph/blob/main/examples/llm-compiler/LLMCompiler.ipynb?ref=blog.langchain.dev
to speed up the execution of agentic tasks by eagerly-executed tasks within a DAG
Components
Planer prompt
================================ System Message ================================
Given a user query, create a plan to solve it with the utmost parallelizability. Each plan should comprise an action from the following {num_tools} types:
{tool_descriptions}
{num_tools}. join(): Collects and combines results from prior actions.
- An LLM agent is called upon invoking join() to either finalize the user query or wait until the plans are executed.
- join should always be the last action in the plan, and will be called in two scenarios:
(a) if the answer can be determined by gathering the outputs from tasks to generate the final response.
(b) if the answer cannot be determined in the planning phase before you execute the plans. Guidelines:
- Each action described above contains input/output types and description.
- You must strictly adhere to the input and output types for each action.
- The action descriptions contain the guidelines. You MUST strictly follow those guidelines when you use the actions.
- Each action in the plan should strictly be one of the above types. Follow the Python conventions for each action.
- Each action MUST have a unique ID, which is strictly increasing.
- Inputs for actions can either be constants or outputs from preceding actions. In the latter case, use the format $id to denote the ID of the previous action whose output will be the input.
- Always call join as the last action in the plan. Say '<END_OF_PLAN>' after you call join
- Ensure the plan maximizes parallelizability.
- Only use the provided action types. If a query cannot be addressed using these, invoke the join action for the next steps.
- Never introduce new actions other than the ones provided.
============================= Messages Placeholder =============================
{messages}
================================ System Message ================================
Remember, ONLY respond with the task list in the correct format! E.g.:
idx. tool(arg_name=args)
@as_runnable
def schedule_task(task_inputs, config):
task: Task = task_inputs["task"]
observations: Dict[int, Any] = task_inputs["observations"]
try:
observation = _execute_task(task, observations, config)
except Exception:
import traceback
observation = traceback.format_exception() # repr(e) +
observations[task["idx"]] = observation
def schedule_pending_task(
task: Task, observations: Dict[int, Any], retry_after: float = 0.2
):
while True:
deps = task["dependencies"]
if deps and (any([dep not in observations for dep in deps])):
# Dependencies not yet satisfied
time.sleep(retry_after)
continue
schedule_task.invoke({"task": task, "observations": observations})
break
@as_runnable
def schedule_tasks(scheduler_input: SchedulerInput) -> List[FunctionMessage]:
"""Group the tasks into a DAG schedule."""
# For streaming, we are making a few simplifying assumption:
# 1. The LLM does not create cyclic dependencies
# 2. That the LLM will not generate tasks with future deps
# If this ceases to be a good assumption, you can either
# adjust to do a proper topological sort (not-stream)
# or use a more complicated data structure
tasks = scheduler_input["tasks"]
messages = scheduler_input["messages"]
# If we are re-planning, we may have calls that depend on previous
# plans. Start with those.
observations = _get_observations(messages)
task_names = {}
originals = set(observations)
# ^^ We assume each task inserts a different key above to
# avoid race conditions...
futures = []
retry_after = 0.25 # Retry every quarter second
with ThreadPoolExecutor() as executor:
for task in tasks:
deps = task["dependencies"]
task_names[task["idx"]] = (
task["tool"] if isinstance(task["tool"], str) else task["tool"].name
)
if (
# Depends on other tasks
deps
and (any([dep not in observations for dep in deps]))
):
futures.append(
executor.submit(
schedule_pending_task, task, observations, retry_after
)
)
else:
# No deps or all deps satisfied
# can schedule now
schedule_task.invoke(dict(task=task, observations=observations))
# futures.append(executor.submit(schedule_task.invoke dict(task=task, observations=observations)))
# All tasks have been submitted or enqueued
# Wait for them to complete
wait(futures)
# Convert observations to new tool messages to add to the state
new_observations = {
k: (task_names[k], observations[k])
for k in sorted(observations.keys() - originals)
}
tool_messages = [
FunctionMessage(name=name, content=str(obs), additional_kwargs={"idx": k})
for k, (name, obs) in new_observations.items()
]
return tool_messages
Limitations
The planner output parsing format is fragile if your function requires more than 1 or 2 arguments. We could make it more robust by using streaming tool calling. Variable substitution is fragile in the example above. It could be made more robust by using a fine-tuned model and a more robust syntax (using e.g., Lark or a tool calling schema) The state can grow quite long if you require multiple re-planning runs. To handle, you could add a message compressor once you go above a certain token limit.
https://github.com/OpenBMB/XAgent?tab=readme-ov-file
XAgent is composed of three parts: 🤖 Dispatcher is responsible for dynamically instantiating and dispatching tasks to different agents. It allows us to add new agents and improve the agents' abilities. 🧐 Planner is responsible for generating and rectifying plans for tasks. It divides tasks into subtasks and generates milestones for them, allowing agents to solve tasks step by step. 🦾 Actor is responsible for conducting actions to achieve goals and finish subtasks. The actor utilizes various tools to solve subtasks, and it can also collaborate with humans to solve tasks.
This is slimilar to LLMCompiler and Plan&Execute.
Hightlights:
https://arxiv.org/abs/2305.18323 https://github.com/langchain-ai/langgraph/blob/main/examples/rewoo/rewoo.ipynb?ref=blog.langchain.dev
to improve on the ReACT-style agent architecture in the following ways
- Reduce token consumption and execution time by generating the full chain of tools used in a single pass.
- Simplify the fine-tuning process. Since the planning data doesn't depend on the outputs of the tool, models can be fine-tuned without actually invoking the tools (in theory).
Components
(#E{0-9}+)
that are used for variable subtitution from other task results.Planner prompt:
prompt = """For the following task, make plans that can solve the problem step by step. For each plan, indicate \
which external tool together with tool input to retrieve evidence. You can store the evidence into a \
variable #E that can be called by later tools. (Plan, #E1, Plan, #E2, Plan, ...)
Tools can be one of the following:
(1) Google[input]: Worker that searches results from Google. Useful when you need to find short
and succinct answers about a specific topic. The input should be a search query.
(2) LLM[input]: A pretrained LLM like yourself. Useful when you need to act with general
world knowledge and common sense. Prioritize it when you are confident in solving the problem
yourself. Input can be any instruction.
For example,
Task: Thomas, Toby, and Rebecca worked a total of 157 hours in one week. Thomas worked x
hours. Toby worked 10 hours less than twice what Thomas worked, and Rebecca worked 8 hours
less than Toby. How many hours did Rebecca work?
Plan: Given Thomas worked x hours, translate the problem into algebraic expressions and solve
with Wolfram Alpha. #E1 = WolframAlpha[Solve x + (2x − 10) + ((2x − 10) − 8) = 157]
Plan: Find out the number of hours Thomas worked. #E2 = LLM[What is x, given #E1]
Plan: Calculate the number of hours Rebecca worked. #E3 = Calculator[(2 ∗ #E2 − 10) − 8]
Begin!
Describe your plans with rich details. Each Plan should be followed by only one #E.
Task: {task}"""
Special step parsing:
# Regex to match expressions of the form E#... = ...[...]
regex_pattern = r"Plan:\s*(.+)\s*(#E\d+)\s*=\s*(\w+)\s*\[([^\]]+)\]"
prompt_template = ChatPromptTemplate.from_messages([("user", prompt)])
planner = prompt_template | model
def get_plan(state: ReWOO):
task = state["task"]
result = planner.invoke({"task": task})
# Find all matches in the sample text
matches = re.findall(regex_pattern, result.content)
return {"steps": matches, "plan_string": result.content}
Solver prompt
solve_prompt = """Solve the following task or problem. To solve the problem, we have made step-by-step Plan and \
retrieved corresponding Evidence to each Plan. Use them with caution since long evidence might \
contain irrelevant information.
{plan}
Now solve the question or task according to provided Evidence above. Respond with the answer
directly with no extra words.
Task: {task}
Response:"""
Limitations
If little context of the environment is available, the planner will be ineffective in its tool use. This can typically be ameliorated through few-shot prompting and/or fine-tuning. The tasks are still executed in sequence, meaning the total execution time is impacted by every tool call, not just he longest-running in a given step.
Categories: Self-relection: Reflection, Relfexion Planner enhanced: LLMCompiler, Plan&Execute Tool agent: ReACT, ReWOO
Internals
Agent architecture | Plan and execute | LLM Compiler | XAgent |
---|---|---|---|
Flow graph | planner -> execution_agent -> replan -> [ END, execution_agent ] | planner -> scheduler -> execution -> joiner -> [planner(as re-planner), END ] | planner -> execution_agent -> [planner (as replanner), END] |
Initial Planning Prompt | For the given objective, come up with a simple step by step plan. \ This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \ The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps. {objective} |
Given a user query, create a plan to solve it with the utmost parallelizability. Each plan should comprise an action from the following {num_tools} types: {tool_descriptions} {num_tools}. join(): Collects and combines results from prior actions. - An LLM agent is called upon invoking join() to either finalize the user query or wait until the plans are executed. - join should always be the last action in the plan, and will be called in two scenarios: (a) if the answer can be determined by gathering the outputs from tasks to generate the final response. (b) if the answer cannot be determined in the planning phase before you execute the plans. Guidelines: - Each action described above contains input/output types and description. - You must strictly adhere to the input and output types for each action. - The action descriptions contain the guidelines. You MUST strictly follow those guidelines when you use the actions. - Each action in the plan should strictly be one of the above types. Follow the Python conventions for each action. - Each action MUST have a unique ID, which is strictly increasing. - Inputs for actions can either be constants or outputs from preceding actions. In the latter case, use the format $id to denote the ID of the previous action whose output will be the input. - Always call join as the last action in the plan. Say ' - Ensure the plan maximizes parallelizability. - Only use the provided action types. If a query cannot be addressed using these, invoke the join action for the next steps. - Never introduce new actions other than the ones provided. {messages} Remember, ONLY respond with the task list in the correct format! E.g.: idx. tool(arg_name=args) |
|
Initial Planing Output | List[str] | List[str]<br>1. tool_1(arg1=“arg1”, arg2=3.5, …)<br>Thought: I then want to find out Y by using tool_2<br>2. tool_2(arg1=“”, arg2=“${1}”)’<br>3. join()<END_OF_PLAN>”<br><br> aka, tasks with their dependencies |
|
Execution agent | Tool agent | Task scheduler + Tool agent | Tool agent with self-reflection |
Execution agent output | tool result, str | ||
Re-Planing or Planing iteration | <br>For the given objective, come up with a simple step by step plan. \<br>This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \<br>The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.<br><br>Your objective was this:<br>{input}<br><br>Your original plan was this:<br>{plan}<br><br>You have currently done the follow steps:<br>{past_steps}<br><br>Update your plan accordingly. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan.<br><br> | Prompt in replan<br>‘ - You are given “Previous Plan” which is the plan that the previous agent created along with the execution results ‘<br> “(given as Observation) of each plan and a general thought (given as Thought) about the executed results.”<br> ‘You MUST use these information to create the next plan under “Current Plan”.\n’<br> ‘ - When starting the Current Plan, you should start with “Thought” that outlines the strategy for the next plan.\n’<br> “ - In the Current Plan, you should NEVER repeat the actions that are already executed in the Previous Plan.\n”<br> “ - You must continue the task index from the end of the previous one. Do not repeat task indices.”<br> Joiner prompt <br>Solve a question answering task. Here are some guidelines:<br> - In the Assistant Scratchpad, you will be given results of a plan you have executed to answer the user's question.<br> - Thought needs to reason about the question based on the Observations in 1-2 sentences.<br> - Ignore irrelevant action results.<br> - If the required information is present, give a concise but complete and helpful answer to the user's question.<br> - If you are unable to give a satisfactory finishing answer, replan to get the required information. Respond in the following format:<br><br>Thought: <reason about the task results and whether you have sufficient information to answer the question><br>Action: <action to take><br>Available actions:<br> (1) Finish(the final answer to return to the user): returns the answer and finishes the task.<br> (2) Replan(the reasoning and other information that will help you plan again. Can be a line of any length): instructs why we must replan<br><br><br>{{messages}}<br><br><br>Using the above previous actions, decide whether to replan or finish. If all the required information is present. You may finish. If you have made many attempts to find the information without success, admit so and respond with whatever information you have gathered so the user can work well with you.<br><br> |
||
Re-Planing Output | final response: str OR plan: List[str] | replan feedback: str, final response: str |
Progress update:
https://docs.llamaindex.ai/en/stable/module_guides/deploying/agents/agent_runner/
from llama_index.core.agent import AgentRunner
from llama_index.agent.openai import OpenAIAgentWorker
# construct OpenAIAgent from tools
openai_step_engine = OpenAIAgentWorker.from_tools(tools, llm=llm, verbose=True)
agent = AgentRunner(openai_step_engine)
# create task
task = agent.create_task("What is (121 * 3) + 42?")
# execute step
step_output = agent.run_step(task)
# if step_output is done, finalize response
if step_output.is_last:
response = agent.finalize_response(task.task_id)
# list tasks
task.list_tasks()
# get completed steps
task.get_completed_steps(task.task_id)
print(str(response))
Background research
Readings
https://lilianweng.github.io/posts/2023-06-23-agent/
Present opensource solutions
langchain
https://python.langchain.com/docs/modules/agents/quick_start
AgentExecutor
auto-gpt
https://github.com/Significant-Gravitas/AutoGPT
Agent categories
General agents: like auto-gpt Vertical agents: data-interpreter, code-interpreter, meta-gpt, agents built by coze.
Implmentation details
Antonamy of agent in lilianweng's blog.
Components
API
High-level API: Assistant API in
OpenAI
.Low-level API: Agent Protocol by
autogpt
Digging deep
AgentExecutor
in langchain. Bonus: zhihu article.