langchain-ai / langgraph

Build resilient language agents as graphs.
https://langchain-ai.github.io/langgraph/
MIT License
6.01k stars 946 forks source link

persistence_postgres error #2030

Open naturesh opened 1 day ago

naturesh commented 1 day ago

Checked other resources

Example Code

https://langchain-ai.github.io/langgraph/how-tos/persistence_postgres/ 

My code is all the same as above and the difference is that I used tool_calling_agent(langgraph) not create_react_agent,

Error Message and Stack Trace (if applicable)

---> 26 for event in graph.stream(
     27     *request,stream_mode="values",
     28 ):
     29     event["messages"][-1].pretty_print()
     31 pool.close()

File /Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/langgraph/pregel/__init__.py:1248, in Pregel.stream(self, input, config, stream_mode, output_keys, interrupt_before, interrupt_after, debug, subgraphs)
   1244 if "custom" in stream_modes:
   1245     config[CONF][CONFIG_KEY_STREAM_WRITER] = lambda c: stream.put(
   1246         ((), "custom", c)
   1247     )
-> 1248 with SyncPregelLoop(
   1249     input,
   1250     stream=StreamProtocol(stream.put, stream_modes),
   1251     config=config,
   1252     store=store,
   1253     checkpointer=checkpointer,
   1254     nodes=self.nodes,
   1255     specs=self.channels,
   1256     output_keys=output_keys,
   1257     stream_keys=self.stream_channels_asis,
   1258     debug=debug,
   1259 ) as loop:
   1260     # create runner
   1261     runner = PregelRunner(
   1262         submit=loop.submit,
   1263         put_writes=loop.put_writes,
   1264     )
   1265     # enable subgraph streaming

File /Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/langgraph/pregel/loop.py:727, in SyncPregelLoop.__enter__(self)
    725         raise CheckpointNotLatest
    726 elif self.checkpointer:
--> 727     saved = self.checkpointer.get_tuple(self.checkpoint_config)
    728 else:
    729     saved = None

File /Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/langgraph/checkpoint/postgres/__init__.py:228, in PostgresSaver.get_tuple(self, config)
    225     where = "WHERE thread_id = %s AND checkpoint_ns = %s ORDER BY checkpoint_id DESC LIMIT 1"
    227 with self._cursor() as cur:
--> 228     cur.execute(
    229         self.SELECT_SQL + where,
    230         args,
    231         binary=True,
    232     )
    234     for value in cur:
    235         return CheckpointTuple(
    236             {
    237                 "configurable": {
   (...)
    260             self._load_writes(value["pending_writes"]),
    261         )

File /Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/psycopg/cursor.py:97, in Cursor.execute(self, query, params, prepare, binary)
     93         self._conn.wait(
     94             self._execute_gen(query, params, prepare=prepare, binary=binary)
     95         )
     96 except e._NO_TRACEBACK as ex:
---> 97     raise ex.with_traceback(None)
     98 return self

DuplicatePreparedStatement: prepared statement "_pg3_0" already exists

Description

https://langchain-ai.github.io/langgraph/how-tos/persistence_postgres/

kwargs={ "autocommit": True, "prepare_threshold": 0, }

In , when threshold is a large number, the typical llm agent outputs well, but errors occur after using the tool.

If threshold 0, llm agent output will not come out and an error will occur.

System Info

langgraph==0.2.34 langgraph-checkpoint==2.0.0 langgraph-checkpoint-postgres==2.0.0

I am using Postgres in supabase project

vbarda commented 1 day ago

@naturesh could you please provide a full reproducible code snippet? what is tool_calling_agent?

naturesh commented 12 hours ago
from typing import Annotated, List

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig
from langgraph.graph.message import AnyMessage, add_messages
from typing_extensions import TypedDict

class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

class Assistant:
    def __init__(self, runnable: Runnable):
        """
        Initialize the Assistant with a runnable object.

        Args:
            runnable (Runnable): The runnable instance to invoke.
        """
        self.runnable = runnable

    def __call__(self, state: State, config: RunnableConfig):
        """
        Call method to invoke the LLM and handle its responses.
        Re-prompt the assistant if the response is not a tool call or meaningful text.

        Args:
            state (State): The current state containing messages.
            config (RunnableConfig): The configuration for the runnable.

        Returns:
            dict: The final state containing the updated messages.
        """
        while True:
            result = self.runnable.invoke(state)  # Invoke the LLM
            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            else:
                break
        return {"messages": result}

# Create the primary assistant prompt template
primary_assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful assistant tasked with answering user questions. "
            "You have access to two tools: retrieve_documents and web_search. "
            "For any user questions about LLM agents, use the retrieve_documents tool to get information for a vectorstore. "
            "For any other questions, such as questions about current events, use the web_search tool to get information from the web. ",
        ),
        ("placeholder", "{messages}"),
    ]
)

# Prompt our LLM and bind tools
assistant_runnable = primary_assistant_prompt | llm.bind_tools(tools)

from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda
from langgraph.prebuilt import ToolNode

def create_tool_node_with_fallback(tools: list) -> dict:
    return ToolNode(tools).with_fallbacks(
        [RunnableLambda(handle_tool_error)], exception_key="error"
    )

def handle_tool_error(state: State) -> dict:
    error = state.get("error")
    tool_calls = state["messages"][-1].tool_calls
    return {
        "messages": [
            ToolMessage(
                content=f"Error: {repr(error)}\n please fix your mistakes.",
                tool_call_id=tc["id"],
            )
            for tc in tool_calls
        ]
    }

from IPython.display import Image, display
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph
from langgraph.prebuilt import tools_condition

# Graph
builder = StateGraph(State)

# Define nodes: these do the work
builder.add_node("assistant", Assistant(assistant_runnable))
builder.add_node("tools", create_tool_node_with_fallback(tools))

# Define edges: these determine how the control flow moves
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
    "assistant",
    # If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
    # If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
    tools_condition,
)
builder.add_edge("tools", "assistant")

# The checkpointer lets the graph persist its state
memory = MemorySaver()
react_graph = builder.compile(checkpointer=memory)

# Show
display(Image(react_graph.get_graph(xray=True).draw_mermaid_png()))

this is my code ( langgraph tutorial example )

vbarda commented 6 hours ago

@naturesh i cannot reproduce the issue when invoking the graph from your example with postgres checkpointer, e.g.

config = {"configurable": {"thread_id": "2"}}
DB_URI = "postgres://postgres:postgres@localhost:5442/postgres?sslmode=disable"

with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
    react_graph = builder.compile(checkpointer=checkpointer)
    res = react_graph.invoke({"messages": [("user", "hi")]}, config)

have you called PostgresSaver.setup()? also, which postgres version are you running locally? we recommend postgres >= 16

also, i would recommend rerunning a notebook in a fresh virtual environment and see if the error is still there. let me know if you're still running into issues after that