langchain-ai / langchain-postgres

LangChain abstractions backed by Postgres Backend
MIT License
116 stars 44 forks source link

AsyncEngine error: using pgvector to embedding -- cannot insert multiple commands into a prepared statement [SQL: SELECT pg_advisory_xact_lock(1573678846307946496);CREATE EXTENSION IF NOT EXISTS vector #86

Closed jack-yu-matrix closed 2 months ago

jack-yu-matrix commented 3 months ago

my test code like below: -----test code-------

async def main(): """load origin documents""" docs = [ Document( page_content="there are cats in the pond", metadata={"id": 1, "location": "pond", "topic": "animals"}, ), Document( page_content="ducks are also found in the pond", metadata={"id": 2, "location": "pond", "topic": "animals"}, ), Document( page_content="fresh apples are available at the market", metadata={"id": 3, "location": "market", "topic": "food"}, ), Document( page_content="the market also sells fresh oranges", metadata={"id": 4, "location": "market", "topic": "food"}, ), Document( page_content="the new art exhibit is fascinating", metadata={"id": 5, "location": "museum", "topic": "art"}, ), Document( page_content="a sculpture exhibit is also at the museum", metadata={"id": 6, "location": "museum", "topic": "art"}, ), Document( page_content="a new coffee shop opened on Main Street", metadata={"id": 7, "location": "Main Street", "topic": "food"}, ), Document( page_content="the book club meets at the library", metadata={"id": 8, "location": "library", "topic": "reading"}, ), Document( page_content="the library hosts a weekly story time for kids", metadata={"id": 9, "location": "library", "topic": "reading"}, ), Document( page_content="a cooking class for beginners is offered at the community center", metadata={"id": 10, "location": "community center", "topic": "classes"}, ), ]

"""split origin documents into chunks"""
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)

"""embedding"""
model_name = "BAAI/bge-small-en"
model_kwargs = {"device": "cpu"}
encode_kwargs = {"normalize_embeddings": True}
embeddings = HuggingFaceBgeEmbeddings(
    model_name=model_name, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs
)

vectorstore = PGVector(
    connection=async_engine,
    embeddings=embeddings,
    collection_name="my_doc",
    use_jsonb=True,
    async_mode=True,
)
"""store the vector data into vector database"""
await vectorstore.aadd_documents(docs, ids=[doc.metadata["id"] for doc in docs])

"""retrieve documents"""
vectorstore.as_retriever()

if name == "main": asyncio.run(main())

----raised error like below------ 2024-07-08 17:01:30,124 INFO sqlalchemy.engine.Engine select pg_catalog.version() 2024-07-08 17:01:30,124 INFO sqlalchemy.engine.Engine [raw sql] () 2024-07-08 17:01:30,127 INFO sqlalchemy.engine.Engine select current_schema() 2024-07-08 17:01:30,127 INFO sqlalchemy.engine.Engine [raw sql] () 2024-07-08 17:01:30,130 INFO sqlalchemy.engine.Engine show standard_conforming_strings 2024-07-08 17:01:30,130 INFO sqlalchemy.engine.Engine [raw sql] () 2024-07-08 17:01:30,132 INFO sqlalchemy.engine.Engine BEGIN (implicit) 2024-07-08 17:01:30,132 INFO sqlalchemy.engine.Engine SELECT pg_advisory_xact_lock(1573678846307946496);CREATE EXTENSION IF NOT EXISTS vector; 2024-07-08 17:01:30,132 INFO sqlalchemy.engine.Engine [generated in 0.00009s] () 2024-07-08 17:01:30,135 INFO sqlalchemy.engine.Engine ROLLBACK Traceback (most recent call last): File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 514, in _prepare_and_execute prepared_stmt, attributes = await adapt_connection._prepare( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 760, in _prepare prepared_stmt = await self._connection.prepare( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/asyncpg/connection.py", line 636, in prepare return await self._prepare( ^^^^^^^^^^^^^^^^^^^^ File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/asyncpg/connection.py", line 654, in _prepare stmt = await self._get_statement( ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/asyncpg/connection.py", line 433, in _get_statement statement = await self._protocol.prepare( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "asyncpg/protocol/protocol.pyx", line 166, in prepare asyncpg.exceptions.PostgresSyntaxError: cannot insert multiple commands into a prepared statement

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context self.dialect.do_execute( File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 924, in do_execute cursor.execute(statement, parameters) File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 572, in execute self._adaptconnection.await( File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only return current.parent.switch(awaitable) # type: ignore[no-any-return,attr-defined] # noqa: E501 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn value = await result ^^^^^^^^^^^^ File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 550, in _prepare_and_execute self._handle_exception(error) File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 501, in _handle_exception self._adapt_connection._handle_exception(error) File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 784, in _handle_exception raise translated_error from error sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.ProgrammingError: <class 'asyncpg.exceptions.PostgresSyntaxError'>: cannot insert multiple commands into a prepared statement

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/src/rag_pipeline/knowledge_base/service/indexing.py", line 81, in asyncio.run(main()) File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/runners.py", line 194, in run return runner.run(main) ^^^^^^^^^^^^^^^^ File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete return future.result() ^^^^^^^^^^^^^^^ File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/src/rag_pipeline/knowledge_base/service/indexing.py", line 75, in main await vectorstore.aadd_documents(docs, ids=[doc.metadata["id"] for doc in docs]) File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/langchain_core/vectorstores.py", line 218, in aadd_documents return await self.aadd_texts(texts, metadatas, *kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/langchain_postgres/vectorstores.py", line 871, in aadd_texts await self.apost_init() # Lazy async init ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/langchain_postgres/vectorstores.py", line 462, in apost_init await self.acreate_vector_extension() File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/langchain_postgres/vectorstores.py", line 483, in acreate_vector_extension await conn.run_sync(_create_vector_extension) File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/engine.py", line 886, in run_sync return await greenlet_spawn( ^^^^^^^^^^^^^^^^^^^^^ File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 201, in greenlet_spawn result = context.throw(sys.exc_info()) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/langchain_postgres/vectorstores.py", line 241, in _create_vector_extension conn.execute(statement) File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1418, in execute return meth( ^^^^^ File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection return connection._execute_clauseelement( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1640, in _execute_clauseelement ret = self._execute_context( ^^^^^^^^^^^^^^^^^^^^^^ File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context return self._exec_single_context( ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context self._handle_dbapi_exception( File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2353, in _handle_dbapi_exception raise sqlalchemy_exception.with_traceback(exc_info[2]) from e File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context self.dialect.do_execute( File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 924, in do_execute cursor.execute(statement, parameters) File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 572, in execute self._adaptconnection.await( File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only return current.parent.switch(awaitable) # type: ignore[no-any-return,attr-defined] # noqa: E501 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn value = await result ^^^^^^^^^^^^ File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 550, in _prepare_and_execute self._handle_exception(error) File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 501, in _handle_exception self._adapt_connection._handle_exception(error) File "/Users/yuxinlei/Documents/workspace/coe_workspace/raise-cn-platform/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 784, in _handle_exception raise translated_error from error sqlalchemy.exc.ProgrammingError: (sqlalchemy.dialects.postgresql.asyncpg.ProgrammingError) <class 'asyncpg.exceptions.PostgresSyntaxError'>: cannot insert multiple commands into a prepared statement [SQL: SELECT pg_advisory_xact_lock(1573678846307946496);CREATE EXTENSION IF NOT EXISTS vector;] (Background on this error at: https://sqlalche.me/e/20/f405)

Process finished with exit code 1

jack-yu-matrix commented 3 months ago

I guess the error comes from below lines:

---------below------- def _create_vector_extension(conn: Connection) -> None: statement = sqlalchemy.text( "SELECT pg_advisory_xact_lock(1573678846307946496);" "CREATE EXTENSION IF NOT EXISTS vector;" ) conn.execute(statement) conn.commit()

pprados commented 2 months ago

What is the initialization of the async_engine ? Do you use the psycogp3 driver ? The package currently only supports the psycogp3 driver.

jack-yu-matrix commented 2 months ago

okk, I see the reason