langchain-ai / langchain-postgres

LangChain abstractions backed by Postgres Backend
MIT License
130 stars 48 forks source link

Async Retriever Error with SQLAlchemy - "cannot insert multiple commands into a prepared statement" #101

Closed alexanderflorean closed 2 months ago

alexanderflorean commented 3 months ago

Description

When running the retriever in langchain-postgres as an asynchronous instance, I encounter the following error, related to the SQL statement execution. The message suggests that the problem arises from attempting to insert multiple SQL commands into a prepared statement, which seems to not be supported.

Error Message

sqlalchemy.exc.ProgrammingError: (sqlalchemy.dialects.postgresql.asyncpg.ProgrammingError) <class 'asyncpg.exceptions.PostgresSyntaxError'>: cannot insert multiple commands into a prepared statement
[SQL: SELECT pg_advisory_xact_lock(1573678846307946496);CREATE EXTENSION IF NOT EXISTS vector;]
(Background on this error at: https://sqlalche.me/e/20/f405)

My thoughts

The error appears to be caused by the SQL statement that combines two commands: acquiring an advisory lock and creating an extension, executed as a single statement, even though it is separated by ";" in the sql text statement. This is seems to not be supported in asynchronous execution with asyncpg as it treats this as a syntax error in a prepared statement.

Proposed Solution

I found that if the SQL commands are defined as separate statements, we avoids combining multiple commands into a single execution call.

The affected code, in vectorstore.py:236:

def _create_vector_extension(conn: Connection) -> None:
    statement = sqlalchemy.text(
        "SELECT pg_advisory_xact_lock(1573678846307946496);"
        "CREATE EXTENSION IF NOT EXISTS vector;"
    )
    conn.execute(statement)
    conn.commit()

Here is the revised function implementation that fixed the issue for me:

def _create_vector_extension(conn: Connection) -> None:
    select_lock_statement = sqlalchemy.text("SELECT pg_advisory_xact_lock(1573678846307946496);")
    create_extension_statement = sqlalchemy.text("CREATE EXTENSION IF NOT EXISTS vector;")

    conn.execute(select_lock_statement)
    conn.execute(create_extension_statement)

    conn.commit()

By separating the statements into individual execution calls, the error is avoided, and the functionality remains intact. Although I haven't tried using it on a synchronous instance.

Question

Would love to get some feedback as I do not often post on issues. Shoud I create a pr for this fix to the issue?

alexanderflorean commented 3 months ago

Also, I found after posting this issue, a similar issue #86, where it was recommended to use the psycopg3 engine whereas I'm using asyncpg.

However, switching to psycopg3, I recived the following error:

sqlalchemy.exc.InterfaceError: (psycopg.InterfaceError) Psycopg cannot use the 'ProactorEventLoop' to run in async mode. Please use a compatible event loop, for instance by setting 'asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())' (Background on this error at: https://sqlalche.me/e/20/rvf5)

I tried to fix that error, using the following, as stated in the error:

    if sys.platform == 'win32':
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

This did not fix the issue, when using the psycopg3 engine.

eyurtsev commented 3 months ago

Could you provide the stack trace?

eyurtsev commented 2 months ago

Closing as issue appears stale