langchain-ai / langgraph

Build resilient language agents as graphs.
https://langchain-ai.github.io/langgraph/
MIT License
6.96k stars 1.12k forks source link

Intermittent InvalidSqlStatementName: prepared statement "_pg3_4" does not exist Error with AsyncPostgresSaver in LangGraph #2576

Open Zolastic opened 3 days ago

Zolastic commented 3 days ago

Checked other resources

Example Code

import os
from dotenv import load_dotenv

from pydantic import BaseModel, ConfigDict, Field
from typing import Annotated, Optional, Dict, Any, List
from typing_extensions import TypedDict
import traceback

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import BaseTool

from langgraph.graph.message import AnyMessage
from langgraph.graph import StateGraph, END
from langgraph.graph.graph import CompiledGraph
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

import json
from uuid import uuid4

# Load environment variables
load_dotenv(override=True)

class State(TypedDict):
    query: Annotated[str, "The user's query"] = ""
    messages: Annotated[list[AnyMessage], "The messages exchanged with the assistant"] = []
    intent_response: Annotated[Optional[Dict[str, Any]], "The response from the intent assistant"] = None
    response_sent: Annotated[bool, "Flag to indicate if the response has been sent."] = False

class ProductAssistant(BaseModel):
    name: str
    builder: Optional[StateGraph] = Field(default=None, exclude=True)
    graph: Optional[CompiledGraph] = None

    model_config = ConfigDict(arbitrary_types_allowed=True)

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.builder = StateGraph(State)

        # Adding nodes to the graph
        self.builder.add_node("main_assistant", self.main_assistant)
        self.builder.add_node("intent_assistant", self.intent_assistant)
        self.builder.add_node("generic_reply_assistant", self.generic_reply_assistant)
        self.builder.add_node("single_product_assistant", self.single_product_assistant)
        self.builder.add_node("multiple_products_assistant", self.multiple_products_assistant)
        self.builder.add_node("call_tool", self.call_tool)
        self.builder.add_node("clean_and_respond", self.clean_and_respond)

        self.builder.set_entry_point("main_assistant")

        self.builder.add_conditional_edges(
            "main_assistant",
            self.should_call_tool,
            {
                "continue": "call_tool",
                "end": "intent_assistant"
            }
        )

        self.builder.add_conditional_edges(
            "call_tool",
            self.should_call_tool,
            {
                "continue": "call_tool",
                "end": "main_assistant",
            }
        )

        self.builder.add_conditional_edges(
            "intent_assistant",
            self.assistant_to_call,
        )

        self.builder.add_edge("single_product_assistant", "clean_and_respond")
        self.builder.add_edge("multiple_products_assistant", "clean_and_respond")

        self.builder.add_edge("clean_and_respond", END)

    def main_assistant(self, state: State, config):
        messages = state["messages"]

        # Build the prompt (implementation omitted for brevity)
        prompt_builder: ChatPromptTemplate = ...  # Your prompt setup here
        prompt = prompt_builder.invoke(state)

        model = ChatOpenAI(model="gpt-4o-mini")

        tools = self.create_tools()

        model = model.bind_tools(tools, parallel_tool_calls=False)

        response = model.invoke(prompt)

        if hasattr(response, "tool_calls") and response.tool_calls:
            messages.append(response)

            tool_names = [tool_call["name"] for tool_call in response.tool_calls]
            print(f"Tool detected and appended: {tool_names}")
        else:
            print("No tool detected.")

        return {"messages": messages}

    # ... [Other methods omitted for brevity] ...

    def create_tools(
        self,
    ) -> List[BaseTool]:
        # Tools are created here (implementation omitted)
        return [fetch_products, fetch_product_details, fetch_terms_and_conditions]

    async def invoke(
        self,
        query: str,
        user_metadata: Optional[Dict[str, Any]] = None,
        search_config: Optional[Dict[str, Any]] = None,
    ):
        thread_id = uuid4().hex

        config = {
            "configurable": {"thread_id": thread_id},
            "user_metadata": user_metadata or None,
            "search_config": search_config or None,
        }

        initial_state: State = {
            "messages": [HumanMessage(content=query)],
            "query": query,
            "response_sent": False,
        }

        async with AsyncPostgresSaver.from_conn_string(os.getenv("DATABASE_URL", "")) as async_memory:
            await async_memory.setup()
            self.graph = self.builder.compile(checkpointer=async_memory)

            try:
                async for event in self.graph.astream_events(initial_state, config, version="v2"):
                    event_type = event.get('event')
                    event_data = event.get('data')

                    if event_type == "on_chain_stream":
                        chunk = event_data.get('chunk', {})
                        response_sent = chunk.get('response_sent', False)

                        messages = chunk.get('messages', [])
                        if messages:
                            last_ai_message = next(
                                (msg.content for msg in reversed(messages) if isinstance(msg, AIMessage)), None
                            )

                            if last_ai_message and response_sent:
                                if last_ai_message and isinstance(last_ai_message, str):
                                    try:
                                        structured_response_dict = json.loads(last_ai_message)
                                        yield json.dumps(structured_response_dict)
                                    except json.JSONDecodeError:
                                        last_ai_message = last_ai_message.replace('"', "'")
                                        yield json.dumps({"message": last_ai_message})

            except Exception as e:
                print(f"Error in invoking the assistant: {str(e)}")
                traceback.print_exc()

Error Message and Stack Trace (if applicable)

Error in invoking the assistant: prepared statement "_pg3_4" does not exist
Traceback (most recent call last):
  File "/path/to/product_assistant.py", line 442, in invoke
    async for event in self.graph.astream_events(initial_state, config, version="v2"):
  File "/path/to/langchain_core/runnables/base.py", line 1388, in astream_events
    async for event in event_stream:
  File "/path/to/langchain_core/tracers/event_stream.py", line 1012, in _astream_events_implementation_v2
    await task
  File "/path/to/langchain_core/tracers/event_stream.py", line 967, in consume_astream
    async for _ in event_streamer.tap_output_aiter(run_id, stream):
  File "/path/to/langchain_core/tracers/event_stream.py", line 203, in tap_output_aiter
    async for chunk in output:
  File "/path/to/langgraph/pregel/__init__.py", line 1823, in astream
    async with AsyncPregelLoop(
               ^^^^^^^^^^^^^^^^
  File "/path/to/langgraph/pregel/loop.py", line 1011, in __aexit__
    return await asyncio.shield(
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/contextlib.py", line 754, in __aexit__
    raise exc_details[1]
  File "/usr/local/lib/python3.12/contextlib.py", line 737, in __aexit__
    cb_suppress = await cb(*exc_details)
                  ^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/langgraph/pregel/executor.py", line 191, in __aexit__
    raise exc
  File "/path/to/langgraph/checkpoint/postgres/aio.py", line 320, in aput_writes
    async with self._cursor(pipeline=True) as cur:
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/contextlib.py", line 217, in __aexit__
    await anext(self.gen)
  File "/path/to/langgraph/checkpoint/postgres/aio.py", line 349, in _cursor
    async with self.lock, conn.pipeline(), conn.cursor(
                          ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/contextlib.py", line 217, in __aexit__
    await anext(self.gen)
  File "/path/to/psycopg/connection_async.py", line 398, in pipeline
    async with pipeline:
               ^^^^^^^^
  File "/path/to/psycopg/_pipeline.py", line 266, in __aexit__
    raise exc2.with_traceback(None)
psycopg.errors.InvalidSqlStatementName: prepared statement "_pg3_4" does not exist

Description

I'm trying to use LangGraph with Supabase PostgreSQL as my database. I have an assistant implemented in product_assistant.py that uses AsyncPostgresSaver for asynchronous database operations. My assistant works fine most of the time, but sometimes it throws the following error:

psycopg.errors.InvalidSqlStatementName: prepared statement "_pg3_4" does not exist

This error seems to occur intermittently, and I haven't been able to identify a consistent pattern.

Additional Information:

Question:

How can I resolve this intermittent InvalidSqlStatementName error? Is there a recommended way to handle database operations in tools when using LangGraph, especially when both the assistant and the tools make database calls? Should I refactor my tools to use asynchronous database sessions? Any guidance on how to solve this issue would be greatly appreciated.

System Info

Zolastic commented 3 days ago

After further investigation, I've discovered that the issue arises when one of my tools makes a database call. Specifically, when the assistant invokes a tool that interacts with the database using SQLAlchemy, I intermittently encounter the following error:

psycopg.errors.InvalidSqlStatementName: prepared statement "_pg3_4" does not exist

Relevant Code:

Here are the key snippets of my tools:

# tools.py

from langchain_core.tools import tool
from dxs.gerica.repository.sql import ProductSQLRepository
from dxs.gerica.utils.sql_utils import get_database_session
from .utils import Citation, Product as ProductDetails

@tool(
    "fetch_products",
    response_format="content_and_artifact"
)
def fetch_products():
    try:
        db_session = get_database_session()
        product_repository = ProductSQLRepository(session=db_session)
        products = product_repository.fetch_all()

        if not products:
            return "No products found."

        context_str = "<products>\n"
        for product in products:
            context_str += f"""
            <product>
                <id>{product.id}</id>
                <name>{product.name}</name>
                <summary>{product.summary or 'N/A'}</summary>
                <description>{product.description or 'N/A'}</description>
            </product>
            """
        context_str += "</products>"

        citations = [
            Citation(
                id="",
                type="URL",
                title="Example Products Page",
                url="https://www.example.com/products",
                checksum="",
            )
        ]

        return context_str, {"citations": citations}
    finally:
        db_session.close()

@tool(
    "fetch_product_details",
    response_format="content_and_artifact"
)
def fetch_product_details(product_ids):
    try:
        db_session = get_database_session()
        product_repository = ProductSQLRepository(session=db_session)

        products = [product_repository.find_one(pid) for pid in product_ids]
        if not products:
            return "No products found for the given IDs."

        product_details = []
        context_str = "<products>\n"
        for product in products:
            context_str += f"""
            <product>
                <id>{product.id}</id>
                <name>{product.name}</name>
                <summary>{product.summary}</summary>
                <description>{product.description}</description>
            </product>
            """
            product_details.append(ProductDetails(id=product.id, name=product.name))

        context_str += "</products>"

        citations = [
            Citation(id=product.id, type="URL", title=product.name, url=product.product_page_url or "")
            for product in products
        ]

        return context_str, {"product_details": product_details, "citations": citations}
    finally:
        db_session.close()
# sql_utils.py

import os
import sqlalchemy as sa
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from dotenv import load_dotenv

load_dotenv()

def get_database_session():
    db_url = os.getenv("DATABASE_URL")
    db = sa.create_engine(db_url)
    Session = sessionmaker(bind=db)
    return Session()

Base = declarative_base()

Observation:


Request for Assistance:

Would refactoring my tools to use asynchronous database sessions (e.g., SQLAlchemy AsyncIO) resolve this issue? Are there other steps I should take to prevent this conflict between synchronous and asynchronous database interactions?

Any guidance or recommendations would be greatly appreciated! Thank you.