langchain-ai / langchain-postgres

LangChain abstractions backed by Postgres Backend
MIT License
66 stars 22 forks source link

Add async mode for pgvector #32

Closed pprados closed 3 weeks ago

pprados commented 2 months ago

This PR adds the async approach for pgvector.

Some remarks:

In a RAG architecture, it is necessary to import document chunks.

To keep track of the links between chunks and documents, we can use the index() API. This API proposes to use an SQL-type record manager.

In a classic use case, using SQLRecordManager and a vector database, it is impossible to guarantee the consistency of the import.

Indeed, if a crash occurs during the import, there is an inconsistency between the SQL database and the vector database.

PGVector is the solution to this problem.

Indeed, it is possible to use a single database (and not a two-phase commit with 2 technologies, if they are both compatible). But, for this, it is necessary to be able to combine the transactions between the use of SQLRecordManager and PGVector as a vector database.

This is only possible if it is possible to intervene on the session_maker.

This is why we propose to make this attribute public. By unifying the session_maker of SQLRecordManager and PGVector, it is possible to guarantee that all processes will be executed in a single transaction.

This is, moreover, the only solution we know of to guarantee the consistency of the import of chunks into a vector database. It's possible only if the outer session is built with the connection.

def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )

    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
    )
    record_manager.create_schema()

    with engine.connect() as connection:
        session_maker = scoped_session(sessionmaker(bind=connection))
        # NOTE: Update session_factories
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        with connection.begin():
            loader = CSVLoader(
                    "data/faq/faq.csv",
                    source_column="source",
                    autodetect_encoding=True,
                )
            result = index(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)

The same thing is possible asynchronously, but a bug in sql_record_manager.py in _amake_session() must first be fixed (See PR ).

    async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]:
        """Create a session and close it after use."""

        # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~
        if not isinstance(self.engine, AsyncEngine):
            raise AssertionError("This method is not supported for sync engines.")

        async with self.session_factory() as session:
            yield session

Then, it is possible to do the same thing asynchronously:

async def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_async_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )
    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
        async_mode=True,
    )
    await record_manager.acreate_schema()

    async with engine.connect() as connection:
        session_maker = async_scoped_session(
            async_sessionmaker(bind=connection),
            scopefunc=current_task)
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        async with connection.begin():
            loader = CSVLoader(
                "data/faq/faq.csv",
                source_column="source",
                autodetect_encoding=True,
            )
            result = await aindex(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)

asyncio.run(main())
pprados commented 2 months ago

I am in holiday now. I will respond next week

Le lun. 29 avr. 2024, 20:24, Eugene Yurtsev @.***> a écrit :

@.**** commented on this pull request.

In langchain_postgres/vectorstores.py https://github.com/langchain-ai/langchain-postgres/pull/32#discussion_r1583519327 :

 """
 def __init__(
     self,
     embeddings: Embeddings,
     *,
  • connection: Optional[Connection] = None,
  • connection: Union[None, DBConnection, Engine, AsyncEngine, str] = None,

Could we separate out the async and sync connections?

connection: Union[None, DBConnection, Engine, str] = None,connection_async: Union[ AsyncEngine, str] = None

And remove async_mode as a parameter

In langchain_postgres/vectorstores.py https://github.com/langchain-ai/langchain-postgres/pull/32#discussion_r1583531739 :

         collection = self.get_collection(session)

if not collection: self.logger.warning("Collection not found") return session.delete(collection) session.commit()

  • async def adelete_collection(self) -> None:
  • assert self._async_engine, "This method must be called with async_mode"
  • await self.__apost_init__() # Lazy async init

I would love to remove the post init behavior from all methods, and eventually from init. Instead we can just raise an exception here.

The reasons are:

  1. Because of this pattern, one cannot manage the schema without initializing the vectostor
  2. This causes bad patterns in unit testing code that's making resetting the state of the db strange
  3. User code becomes ignorant of migrations, which makes it difficult to actually create migration logic on schema changes

What do you think?

— Reply to this email directly, view it on GitHub https://github.com/langchain-ai/langchain-postgres/pull/32#pullrequestreview-2029231119, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABR7FWA3ZTIOMRIJGMPYV3Y72F4ZAVCNFSM6AAAAABGSRLIV6VHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZDAMRZGIZTCMJRHE . You are receiving this because you authored the thread.Message ID: @.***>

pprados commented 1 month ago

@eyurtsev May be, you can consider another may other PR 20735 ?

pprados commented 1 month ago

@eyurtsev

The promise of the constructor, with the create_extension parameter, is to guarantee that the extension is added before the APIs are used. Since this promise cannot be kept in an async scenario, there is an alternative:

Can you launch the workflow?

pprados commented 1 month ago

@eyurtsev, can you approval the workflow? I can check if all the code passes the CI.

pprados commented 1 month ago

Hello @eyurtsev

I've aligned the code with your similar requests for SQLChatMessageHistory.

Currently, SQLChatMessageHistory cannot be reviewed, as there are bugs in the base sources, when linting around docs/scripts/arxiv_references.py or other *.ipynb. I'm waiting for the master sources to be updated.

Can you review this code which allows me to set the resilience with langchain definitively?

ssifood commented 1 month ago

I am waitting this PR!!

aysnc langchain_postgres

pprados commented 1 month ago

@eyurtsev I locked with "1 change requested", but I can't find where the change request is. Can you help me?

pprados commented 3 weeks ago

@eyurtsev I made the mistake of doing a rebase while a review was in progress. This seems to block the process. There's a ‘1 change requested’ request that I can't validate. I propose another PR identical to resolve this problem.