run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
35.33k stars 4.97k forks source link

[Question]: How do I update my Postgres Vector database with the updated documents #15283

Open Vigneshwar-do opened 1 month ago

Vigneshwar-do commented 1 month ago

Question Validation

Question

I am using python scripts to automate data processing, chunking and embedding into Postgres vector database. Going forward how do I implement a logic such that if there an update in the source document I want to update the record in Postgres vector database as well if it is not present then chunk it, create embeddings and index it as a new addition. My source data contains meta data listed below,

{'article': {'id': 12345,
  'created_at': '2023-11-02T15:12:56Z',
  'updated_at': '2024-05-14T19:28:42Z',
  'name': 'Sample doc 1',
  'title': 'Sample doc 1',
  'content' : 'This is a sample content'
}}

I am using SemanticSplitterNodeParser to perform chunking. Can I perform delete operation in Postgres vector database if the data has been deleted in source?

dosubot[bot] commented 1 month ago

Hey @Vigneshwar-do! I'm here to help you with any bugs, questions, or contributions. Let's work together to find a solution.

To update your Postgres vector database with updated documents, you can follow these steps:

  1. Initialize the VearchVectorStore:

    from llama_index.vector_stores.vearch.base import VearchVectorStore
    
    vector_store = VearchVectorStore(
       path_or_url='your_cluster_url',
       table_name='your_table_name',
       db_name='your_db_name'
    )
  2. Add or Update Records:

    def add_or_update_records(vector_store, nodes):
       existing_ids = set(vector_store.query_all_ids())
       new_ids = set(node.node_id for node in nodes)
    
       # Update existing records
       for node in nodes:
           if node.node_id in existing_ids:
               vector_store.update(node)
           else:
               vector_store.add([node])
  3. Delete Records:

    def delete_records(vector_store, ref_doc_id):
       vector_store.delete(ref_doc_id)
  4. Full Synchronization Logic:

    def sync_data(vector_store, source_data):
       # Assuming source_data is a list of nodes
       add_or_update_records(vector_store, source_data)
    
       # Get all current IDs in the database
       current_ids = set(vector_store.query_all_ids())
       source_ids = set(node.node_id for node in source_data)
    
       # Find IDs to delete
       ids_to_delete = current_ids - source_ids
       for ref_doc_id in ids_to_delete:
           delete_records(vector_store, ref_doc_id)
  5. Chunking and Embedding: Use SemanticSplitterNodeParser to perform chunking and generate embeddings for the chunks:

    from llama_index.legacy.node_parser.text.semantic_splitter import SemanticSplitterNodeParser
    from llama_index.legacy.embeddings.openai import OpenAIEmbedding
    from llama_index.legacy.schema import Document
    
    embed_model = OpenAIEmbedding()
    splitter = SemanticSplitterNodeParser(
       embed_model=embed_model,
       buffer_size=1,
       breakpoint_percentile_threshold=95
    )
    
    documents = [Document(text="Your document text here.")]
    nodes = splitter.build_semantic_nodes_from_documents(documents)
  6. Example Usage:

    source_data = [...]  # List of nodes from the source
    sync_data(vector_store, source_data)

This code will ensure that the records in your Postgres vector database are synchronized with the source data, updating existing records, adding new ones, and deleting those that are no longer present in the source [1][2].

For generating embeddings, you can use the OpenAIEmbedding model as shown above. This will help you create embeddings for the chunks created from the source document [3][4].

To continue talking to Dosu, mention @dosu.

Vigneshwar-do commented 1 month ago

I am using PGVectorStore for my usecase. Can I perform the same insert/update/delete? This is how I am indexing the chunked documents as of now

def postgres_vectordb():
        """Access vector database in postgres"""
        connection_string = conf.vector_connection_string
        db_name = conf.vector_db_name
        conn = psycopg2.connect(connection_string)
        conn.autocommit = True
        with conn.cursor() as c:
            c.execute(f"""
            SELECT pg_terminate_backend(pid)
            FROM pg_stat_activity
            WHERE datname = '{db_name}' AND pid <> pg_backend_pid();
            """)
        return connection_string,db_name
    def index_postgres_vectordb(connection_string, db_name, chunked_documents):
        """Embedding the chunked documents, indexing them, and storing them in the PGVectorStore."""
        # Set the id_ property from the metadata for each document
        embed_model = language_model.az_embedding_model(conf.azure_openai_api_key,
                                                 conf.azure_openai_endpoint)
        for doc in chunked_documents:
            if 'id' in doc.metadata:
                doc.id_ = str(doc.metadata['id'])  # Ensure it's a string

        url = make_url(connection_string)
        vector_store = PGVectorStore.from_params(
            database=db_name,
            host=url.host,
            password=url.password,
            port=url.port,
            user=url.username,
            table_name=self.conf.vector_table_name,
            embed_dim=1536,  # openai embedding dimension
        )
        storage_context = StorageContext.from_defaults(vector_store=vector_store)
        index = VectorStoreIndex(
            chunked_documents, storage_context=storage_context, show_progress=True,
            embed_model=embed_model
        )
        return index
dosubot[bot] commented 1 month ago

Yes, you can perform insert, update, and delete operations using PGVectorStore. Here's how you can modify your existing code to include these operations:

Insert or Update Logic

You can modify the index_postgres_vectordb function to check if a document already exists in the database. If it exists, update it; otherwise, insert it as a new record.

Delete Logic

You can add a function to delete documents from the database if they have been deleted in the source.

Updated Code

Here's how you can implement these changes:

import psycopg2
from sqlalchemy import make_url
from llama_index.vector_stores.postgres import PGVectorStore
from llama_index.core.storage.storage_context import StorageContext
from llama_index.core.indices.vector_store.base import VectorStoreIndex
from llama_index.core.schema import BaseNode

def postgres_vectordb():
    """Access vector database in postgres"""
    connection_string = conf.vector_connection_string
    db_name = conf.vector_db_name
    conn = psycopg2.connect(connection_string)
    conn.autocommit = True
    with conn.cursor() as c:
        c.execute(f"""
        SELECT pg_terminate_backend(pid)
        FROM pg_stat_activity
        WHERE datname = '{db_name}' AND pid <> pg_backend_pid();
        """)
    return connection_string, db_name

def add_or_update(vector_store, nodes):
    """Add or update nodes in the vector store."""
    ids = []
    with vector_store._session() as session, session.begin():
        for node in nodes:
            ids.append(node.node_id)
            item = vector_store._node_to_table_row(node)
            existing_item = session.query(vector_store._table_class).filter_by(node_id=node.node_id).first()
            if existing_item:
                existing_item.text = item.text
                existing_item.metadata_ = item.metadata_
                existing_item.embedding = item.embedding
            else:
                session.add(item)
        session.commit()
    return ids

def delete(vector_store, ref_doc_id):
    """Delete nodes from the vector store."""
    import sqlalchemy
    with vector_store._session() as session, session.begin():
        stmt = sqlalchemy.text(
            f"DELETE FROM {vector_store.schema_name}.data_{vector_store.table_name} where "
            f"(metadata_->>'doc_id')::text = '{ref_doc_id}' "
        )
        session.execute(stmt)
        session.commit()

def index_postgres_vectordb(connection_string, db_name, chunked_documents):
    """Embedding the chunked documents, indexing them, and storing them in the PGVectorStore."""
    embed_model = language_model.az_embedding_model(conf.azure_openai_api_key, conf.azure_openai_endpoint)
    for doc in chunked_documents:
        if 'id' in doc.metadata:
            doc.id_ = str(doc.metadata['id'])  # Ensure it's a string

    url = make_url(connection_string)
    vector_store = PGVectorStore.from_params(
        database=db_name,
        host=url.host,
        password=url.password,
        port=url.port,
        user=url.username,
        table_name=conf.vector_table_name,
        embed_dim=1536,  # openai embedding dimension
    )
    storage_context = StorageContext.from_defaults(vector_store=vector_store)
    index = VectorStoreIndex(
        chunked_documents, storage_context=storage_context, show_progress=True,
        embed_model=embed_model
    )

    # Add or update documents
    add_or_update(vector_store, chunked_documents)

    return index

# Example usage
connection_string, db_name = postgres_vectordb()
chunked_documents = [...]  # Your chunked documents
index = index_postgres_vectordb(connection_string, db_name, chunked_documents)

# To delete a document
ref_doc_id = "12345"  # The document ID to delete
delete(vector_store, ref_doc_id)

This code will ensure that the records in your Postgres vector database are synchronized with the source data, updating existing records, adding new ones, and deleting those that are no longer present in the source [1][2].

To continue talking to Dosu, mention @dosu.

Vigneshwar-do commented 1 month ago

This solution doesn't seem to work. I am getting the following error ValueError: embedding not set.

dosubot[bot] commented 1 month ago

The ValueError: embedding not set error occurs because the embedding attribute of the BaseNode object is not set before adding it to the PGVectorStore. You need to ensure that each BaseNode object has its embedding set before calling the add method.

Here's how you can modify your code to set the embedding for each BaseNode:

  1. Ensure Embeddings are Set: Before adding nodes to the PGVectorStore, make sure each node has its embedding set.

  2. Update the index_postgres_vectordb Function: Modify the function to set embeddings for each node.

Here's the updated code:

import psycopg2
from sqlalchemy import make_url
from llama_index.vector_stores.postgres import PGVectorStore
from llama_index.core.storage.storage_context import StorageContext
from llama_index.core.indices.vector_store.base import VectorStoreIndex
from llama_index.core.schema import BaseNode

def postgres_vectordb():
    """Access vector database in postgres"""
    connection_string = conf.vector_connection_string
    db_name = conf.vector_db_name
    conn = psycopg2.connect(connection_string)
    conn.autocommit = True
    with conn.cursor() as c:
        c.execute(f"""
        SELECT pg_terminate_backend(pid)
        FROM pg_stat_activity
        WHERE datname = '{db_name}' AND pid <> pg_backend_pid();
        """)
    return connection_string, db_name

def add_or_update(vector_store, nodes):
    """Add or update nodes in the vector store."""
    ids = []
    with vector_store._session() as session, session.begin():
        for node in nodes:
            ids.append(node.node_id)
            item = vector_store._node_to_table_row(node)
            existing_item = session.query(vector_store._table_class).filter_by(node_id=node.node_id).first()
            if existing_item:
                existing_item.text = item.text
                existing_item.metadata_ = item.metadata_
                existing_item.embedding = item.embedding
            else:
                session.add(item)
        session.commit()
    return ids

def delete(vector_store, ref_doc_id):
    """Delete nodes from the vector store."""
    import sqlalchemy
    with vector_store._session() as session, session.begin():
        stmt = sqlalchemy.text(
            f"DELETE FROM {vector_store.schema_name}.data_{vector_store.table_name} where "
            f"(metadata_->>'doc_id')::text = '{ref_doc_id}' "
        )
        session.execute(stmt)
        session.commit()

def index_postgres_vectordb(connection_string, db_name, chunked_documents):
    """Embedding the chunked documents, indexing them, and storing them in the PGVectorStore."""
    embed_model = language_model.az_embedding_model(conf.azure_openai_api_key, conf.azure_openai_endpoint)
    for doc in chunked_documents:
        if 'id' in doc.metadata:
            doc.id_ = str(doc.metadata['id'])  # Ensure it's a string
        if doc.get_embedding() is None:
            doc.set_embedding(embed_model(doc.get_content()))  # Set the embedding

    url = make_url(connection_string)
    vector_store = PGVectorStore.from_params(
        database=db_name,
        host=url.host,
        password=url.password,
        port=url.port,
        user=url.username,
        table_name=conf.vector_table_name,
        embed_dim=1536,  # openai embedding dimension
    )
    storage_context = StorageContext.from_defaults(vector_store=vector_store)
    index = VectorStoreIndex(
        chunked_documents, storage_context=storage_context, show_progress=True,
        embed_model=embed_model
    )

    # Add or update documents
    add_or_update(vector_store, chunked_documents)

    return index

# Example usage
connection_string, db_name = postgres_vectordb()
chunked_documents = [...]  # Your chunked documents
index = index_postgres_vectordb(connection_string, db_name, chunked_documents)

# To delete a document
ref_doc_id = "12345"  # The document ID to delete
delete(vector_store, ref_doc_id)

This code ensures that the embedding attribute is set for each BaseNode before adding it to the PGVectorStore, preventing the ValueError: embedding not set error [1][2].

To continue talking to Dosu, mention @dosu.