run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
33.39k stars 4.68k forks source link

[Bug]: Unnecessary Expensive Operation during Pipeline UPSERTS Ingestion #14597

Open headyin opened 3 days ago

headyin commented 3 days ago

Bug Description

self.docstore.get_all_document_hashes() in the _handle_upserts method is every expensive. But its return value is only used in the UPSERTS_AND_DELETE case.

https://github.com/run-llama/llama_index/blob/08989d4ff07340ebaeff3a6cfb0d9c12abf630c2/llama-index-core/llama_index/core/ingestion/pipeline.py#L429

Version

0.10.50.post1

Steps to Reproduce

Sorry, I cannot share the code here.

But basically follow these steps:

  1. Create a Ingestion Pipeline with doc_store, cache and vector_store and UPSERTS strategy
  2. prepare lot of documents, in my cases 1.5k+
  3. use pipeline run to load all of the documents
  4. get a new document, and use pipeline run to load the new document only.

Because there are already 1.5k+ documents in the doc_store, the get_all_document_hashes method will be either very slow or very expensive depending on doc store backend. In my cases I tested with DynamoDB doc_store and S3KVStore backed KVDocStore: in the case of DynamoDB, it needs to do a full table scan In the case of S3 backend, it takes 7 minutes to get all of the hashes.

But this is totally not necessary, as during UPSERT operation, we don't need to get all of the documents.

To workaround this issue, I implemented my own doc_store class that returns empty dict from get_all_document_hashes method, and the entire pipeline works correctly and much master

Relevant Logs/Tracbacks

No response

dosubot[bot] commented 3 days ago

Hey @headyin! I'm here to help you with any bugs, questions, or contributions you have. Let's tackle this issue together!

To address the issue of the expensive self.docstore.get_all_document_hashes() call during the UPSERTS strategy in the _handle_upserts method, you can optimize the function to only fetch the necessary document hashes on a per-node basis. This avoids fetching all document hashes upfront, which can be particularly slow with backends like DynamoDB and S3KVStore.

Here is an optimized version of the _handle_upserts function:

async def _ahandle_upserts(
    self,
    nodes: List[BaseNode],
    store_doc_text: bool = True,
) -> List[BaseNode]:
    """Handle docstore upserts by checking hashes and ids."""
    assert self.docstore is not None

    doc_ids_from_nodes = set()
    deduped_nodes_to_run = {}
    for node in nodes:
        ref_doc_id = node.ref_doc_id if node.ref_doc_id else node.id_
        doc_ids_from_nodes.add(ref_doc_id)
        existing_hash = await self.docstore.aget_document_hash(ref_doc_id)
        if not existing_hash:
            # document doesn't exist, so add it
            await self.docstore.aset_document_hash(ref_doc_id, node.hash)
            deduped_nodes_to_run[ref_doc_id] = node
        elif existing_hash and existing_hash != node.hash:
            await self.docstore.adelete_ref_doc(ref_doc_id, raise_error=False)

            if self.vector_store is not None:
                await self.vector_store.adelete(ref_doc_id)

            await self.docstore.aset_document_hash(ref_doc_id, node.hash)

            deduped_nodes_to_run[ref_doc_id] = node
        else:
            continue  # document exists and is unchanged, so skip it

    if self.docstore_strategy == DocstoreStrategy.UPSERTS_AND_DELETE:
        # Identify missing docs and delete them from docstore and vector store
        existing_doc_ids_before = set((await self.docstore.aget_all_document_hashes()).values())
        doc_ids_to_delete = existing_doc_ids_before - doc_ids_from_nodes
        for ref_doc_id in doc_ids_to_delete:
            await self.docstore.adelete_document(ref_doc_id)

            if self.vector_store is not None:
                await self.vector_store.adelete(ref_doc_id)

    nodes_to_run = list(deduped_nodes_to_run.values())
    await self.docstore.async_add_documents(nodes_to_run, store_text=store_doc_text)

    return nodes_to_run

This approach ensures that you only fetch the necessary document hashes for each node, thus avoiding the expensive call to self.docstore.get_all_document_hashes() [1][2].

To continue talking to Dosu, mention @dosu.