run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
36.8k stars 5.28k forks source link

[Question]: How to insert/delete document to/from VectorStoreIndex when using IngestionPipeline? #14756

Closed deval-aiml closed 3 weeks ago

deval-aiml commented 4 months ago

Question Validation

Question

I am confused on how does insertion and deletion work with VectorStoreIndex along with qdrant vector db @logan-markewich

Method definitions used in the code

# Vector Store Init
self.vector_store = QdrantVectorStore(
    collection_name=collection_name,
    # client=self._client,
    aclient=self._aclient,
    enable_hybrid=True,
    batch_size=self.vectordb_config.batch_size,
    hybrid_fusion_fn=self.relative_score_fusion
)
# Index Init
self.index = VectorStoreIndex.from_vector_store(
    self.vector_store,
    Settings.embed_model,
    store_nodes_override = True
 )
 # Function to insert nodes in a index
 async def add_nodes_to_index_async(self, nodes):
    await self.index._async_add_nodes_to_index(
        self.index.index_struct,
        nodes,
        show_progress = True
)
 # Function to init ingestion pipeline
    docstore = RedisDocumentStore.from_host_and_port(
        host=config.redis_config.host, 
        port=config.redis_config.port, 
        namespace="llamasearch"
    )
    parser = SentenceSplitter()
    print(f"Creating new ingestion pipeline")
    ingestion = IngestionPipeline(
        transformations=[parser],
        docstore=docstore,
        vector_store=qdrant_search.vector_store,
        cache=IngestionCache(
            cache=RedisCache.from_host_and_port(
                host=config.redis_config.host, 
                port=config.redis_config.port
            ),
            collection="redis_cache",
        ),
        docstore_strategy=DocstoreStrategy.UPSERTS,
    )
 # Snippet to load a document from file path and insert into index
    documents = SimpleDirectoryReader(input_files=[file_path], filename_as_id=True).load_data()
    for doc in documents:
        print(f"Processing Document ID: {doc.id_}")
        docs.append(doc.id_)
    nodes = await ingestion.arun(documents=documents)
    print(f"Ingesting {len(nodes)} nodes for {len(ingestion.docstore.docs)} chunks")
    await self.add_nodes_to_index_async(nodes)
    ----

This code works but I am unsure if it creates proper parent child relationships with document id and is in sync with vector db (qdrant)

Also, how to delete the document.

self.index.delete_ref_doc(doc_id, delete_from_docstore=True)

Query: This command deletes from vector db but does not delete from docstore even though I am passing True in the argument, what could be the reason? Query: The doc_id passed is the document_id (in my case I am using filename_as_id) or is it doc chunk id? Does it delete a given doc chunk (related nodes) or all nodes associated to that document? Query: Which is standard way of maintaining CRUD updates for documents that is consistent across docstore and vector db?

dosubot[bot] commented 4 months ago

Hey @deval-aiml! I'm here to help you with any coding bugs, questions, or to guide you on becoming a contributor. Let's tackle this together while we wait for a human maintainer.

To properly insert and delete documents in a VectorStoreIndex using an IngestionPipeline with a qdrant vector database, ensuring that the operations stay in sync with both the vector database and the docstore, follow these steps:

Inserting Documents

To insert documents, use the add method of the QdrantVectorStore class. This method takes a list of BaseNode objects, builds the points, and uploads them to the Qdrant collection.

def add(self, nodes: List[BaseNode], **add_kwargs: Any) -> List[str]:
    if len(nodes) > 0 and not self._collection_initialized:
        self._create_collection(
            collection_name=self.collection_name,
            vector_size=len(nodes[0].get_embedding()),
        )

    points, ids = self._build_points(nodes)
    self._client.upload_points(
        collection_name=self.collection_name,
        points=points,
        batch_size=self.batch_size,
        parallel=self.parallel,
        max_retries=self.max_retries,
        wait=True,
    )

    return ids

Deleting Documents

To delete documents, use the delete method of the QdrantVectorStore class. This method takes a ref_doc_id and deletes the corresponding points from the Qdrant collection.

def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
    from qdrant_client.http import models as rest

    self._client.delete(
        collection_name=self.collection_name,
        points_selector=rest.Filter(
            must=[
                rest.FieldCondition(
                    key="doc_id", match=rest.MatchValue(value=ref_doc_id)
                )
            ]
        ),
    )

Maintaining CRUD Updates

To maintain CRUD updates for documents, ensure that each operation (Create, Read, Update, Delete) is properly handled and synchronized between the vector database and the docstore:

  1. Create: Use the add method to insert new documents.
  2. Read: Use the query method to retrieve documents based on their embeddings.
  3. Update: To update a document, delete the existing document using the delete method and then re-insert the updated document using the add method.
  4. Delete: Use the delete method to remove documents.

Handling Upserts and De-duplication

The IngestionPipeline class provides methods to handle upserts and de-duplication of documents. Here is an example of how to handle upserts:

def _handle_upserts(
    self,
    nodes: List[BaseNode],
    store_doc_text: bool = True,
) -> List[BaseNode]:
    assert self.docstore is not None

    existing_doc_ids_before = set(self.docstore.get_all_document_hashes().values())
    doc_ids_from_nodes = set()
    deduped_nodes_to_run = {}
    for node in nodes:
        ref_doc_id = node.ref_doc_id if node.ref_doc_id else node.id_
        doc_ids_from_nodes.add(ref_doc_id)
        existing_hash = self.docstore.get_document_hash(ref_doc_id)
        if not existing_hash:
            self.docstore.set_document_hash(ref_doc_id, node.hash)
            deduped_nodes_to_run[ref_doc_id] = node
        elif existing_hash and existing_hash != node.hash:
            self.docstore.delete_ref_doc(ref_doc_id, raise_error=False)

            if self.vector_store is not None:
                self.vector_store.delete(ref_doc_id)

            self.docstore.set_document_hash(ref_doc_id, node.hash)

            deduped_nodes_to_run[ref_doc_id] = node
        else:
            continue

    if self.docstore_strategy == DocstoreStrategy.UPSERTS_AND_DELETE:
        doc_ids_to_delete = existing_doc_ids_before - doc_ids_from_nodes
        for ref_doc_id in doc_ids_to_delete:
            self.docstore.delete_document(ref_doc_id)

            if self.vector_store is not None:
                self.vector_store.delete(ref_doc_id)

    nodes_to_run = list(deduped_nodes_to_run.values())
    self.docstore.add_documents(nodes_to_run, store_text=store_doc_text)

    return nodes_to_run

Asynchronous Handling

For asynchronous handling of upserts and de-duplication, you can use the following methods:

async def _ahandle_upserts(
    self,
    nodes: List[BaseNode],
    store_doc_text: bool = True,
) -> List[BaseNode]:
    assert self.docstore is not None

    existing_doc_ids_before = set(
        (await self.docstore.aget_all_document_hashes()).values()
    )
    doc_ids_from_nodes = set()
    deduped_nodes_to_run = {}
    for node in nodes:
        ref_doc_id = node.ref_doc_id if node.ref_doc_id else node.id_
        doc_ids_from_nodes.add(ref_doc_id)
        existing_hash = await self.docstore.aget_document_hash(ref_doc_id)
        if not existing_hash:
            await self.docstore.aset_document_hash(ref_doc_id, node.hash)
            deduped_nodes_to_run[ref_doc_id] = node
        elif existing_hash and existing_hash != node.hash:
            await self.docstore.adelete_ref_doc(ref_doc_id, raise_error=False)

            if self.vector_store is not None:
                await self.vector_store.adelete(ref_doc_id)

            await self.docstore.aset_document_hash(ref_doc_id, node.hash)

            deduped_nodes_to_run[ref_doc_id] = node
        else:
            continue

    if self.docstore_strategy == DocstoreStrategy.UPSERTS_AND_DELETE:
        doc_ids_to_delete = existing_doc_ids_before - doc_ids_from_nodes
        for ref_doc_id in doc_ids_to_delete:
            await self.docstore.adelete_document(ref_doc_id)

            if self.vector_store is not None:
                await self.vector_store.adelete(ref_doc_id)

    nodes_to_run = list(deduped_nodes_to_run.values())
    await self.docstore.async_add_documents(nodes_to_run, store_text=store_doc_text)

    return nodes_to_run

Deletion from Docstore

The command self.index.delete_ref_doc(doc_id, delete_from_docstore=True) should delete from both the vector database and the docstore. If it does not delete from the docstore, ensure that the doc_id passed is correct and that the docstore is properly configured to handle deletions. The doc_id should be the document ID (e.g., filename if using filename_as_id), not the doc chunk ID. This command should delete all nodes associated with the given document.

By following these methods and ensuring proper handling of both dense and sparse vectors, you can maintain synchronization between the vector database and the docstore, and effectively manage CRUD operations for your documents [1][2][3][4].

To continue talking to Dosu, mention @dosu.

dvlshah commented 4 months ago

Deletion is not very intuitive from the documentation. I want to use native methods from llamaindex API to consistently delete document using ref_doc_id from docstore and vector db. The other doubt is how do I now the ref_doc_id

Can I fetch the mapping using a defined property or do I need to maintain the mapping on my own?

@dosu