run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
33.51k stars 4.7k forks source link

[Question]: Query ID error while running Dense_X with same index name, #14269

Open LikhithRishi opened 3 weeks ago

LikhithRishi commented 3 weeks ago

Question Validation

Question

getting error while running Dense_X with same index name, while entering a query to generate response,

Query id 40b9d01f-b211-413d-b3d4-a799eff700d6 not found in either retriever_dict or query_engine_dict

it is working fine for below code:

service_context = ServiceContext.from_defaults(
            llm=llm ,
            embed_model=embed_model,

        )
# save to disk

db = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = db.get_or_create_collection("quickstart")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)

index = VectorStoreIndex.from_documents(
    documents, storage_context=storage_context, service_context=service_context
)

# load from disk
db2 = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = db2.get_or_create_collection("quickstart")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
index = VectorStoreIndex.from_vector_store(
    vector_store,
    service_context=service_context
)

# Query Data from the persisted index
query_engine = index.as_query_engine()
response = query_engine.query("Gmacc replacement")
print(response)

but when we reuse embeddings using Dense_X we are getting query Id error

class DenseXRetrievalPack(BaseLlamaPack):
    def __init__(
        self,
        documents: List[Document],
        proposition_llm: Optional[LLM] = None,
        query_llm: Optional[LLM] = None,
        embed_model: Optional[BaseEmbedding] = None,
        text_splitter: TextSplitter = SentenceSplitter(),
        vector_store: Optional[ElasticsearchStore] = None,
        similarity_top_k: int = 4,
    ) -> None:
        """Init params."""
        self._proposition_llm = llm

        embed_model = embed_model

        nodes = text_splitter.get_nodes_from_documents(documents)
        print(nodes)
        sub_nodes = self._gen_propositions(nodes)
        print(sub_nodes,"greg")
        all_nodes = nodes + sub_nodes
        all_nodes_dict = {n.node_id: n for n in all_nodes}

        service_context = ServiceContext.from_defaults(
            llm=query_llm ,
            embed_model=embed_model,
            num_output=self._proposition_llm.metadata.num_output,
        )
        '''
        if os.path.exists('./elastic_db'):
            print("From elasticsearch")
            self.vector_index = VectorStoreIndex.from_vector_store(vector_store,service_context=service_context)
        else:
            storage_context = StorageContext.from_defaults(vector_store=vector_store)
            self.vector_index = VectorStoreIndex(
                 all_nodes, service_context=service_context, show_progress=True,storage_context=storage_context
                 )
            os.mkdir("elastic_db")
        '''
        if os.path.exists('./chroma_db'):
            chroma_client = chromadb.PersistentClient(path="./chroma_db")
            chroma_collection = chroma_client.get_or_create_collection("quickstart")
            vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
            storage_context = StorageContext.from_defaults(vector_store=vector_store)
            self.vector_index = VectorStoreIndex.from_vector_store(vector_store,service_context=service_context)
        else:
           chroma_client = chromadb.PersistentClient(path="./chroma_db")
           chroma_collection = chroma_client.get_or_create_collection("quickstart")
           vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
           storage_context = StorageContext.from_defaults(vector_store=vector_store)
           self.vector_index = VectorStoreIndex(
                all_nodes, service_context=service_context, show_progress=True,storage_context=storage_context,store_nodes_override=True
                )
        self.retriever = RecursiveRetriever(
            "vector",
            retriever_dict={
                "vector": self.vector_index.as_retriever(
                    similarity_top_k=similarity_top_k
                )
            },
            node_dict=all_nodes_dict,
        )

        self.query_engine = RetrieverQueryEngine.from_args(
            self.retriever, service_context=service_context
        )

    async def _aget_proposition(self, node: TextNode) -> List[TextNode]:
        """Get proposition."""
        inital_output = await self._proposition_llm.apredict(
            PROPOSITIONS_PROMPT, node_text=node.text
        )
        outputs = inital_output.split("\n")

        all_propositions = []

        for output in outputs:
            if not output.strip():
                continue
            if not output.strip().endswith("]"):
                if not output.strip().endswith('"') and not output.strip().endswith(
                    ","
                ):
                    output = output + '"'
                output = output + " ]"
            if not output.strip().startswith("["):
                if not output.strip().startswith('"'):
                    output = '"' + output
                output = "[ " + output

            try:
                propositions = json.loads(output)
            except Exception:
                # fallback to yaml
                try:
                    propositions = yaml.safe_load(output)
                except Exception:
                    # fallback to next output
                    continue

            if not isinstance(propositions, list):
                continue

            all_propositions.extend(propositions)

        assert isinstance(all_propositions, list)
        nodes = [TextNode(text=prop) for prop in all_propositions if prop]

        return [IndexNode.from_text_node(n, node.node_id) for n in nodes]

    def _gen_propositions(self, nodes: List[TextNode]) -> List[TextNode]:
        """Get propositions."""
        sub_nodes = asyncio.run(
            run_jobs(
                [self._aget_proposition(node) for node in nodes],
                show_progress=True,
                workers=8,
            )
        )
        print(sub_nodes)

        # Flatten list
        return [node for sub_node in sub_nodes for node in sub_node]

    def get_modules(self) -> Dict[str, Any]:
        """Get modules."""
        return {
            "query_engine": self.query_engine,
            "retriever": self.retriever,
        }

    def run(self, query_str: str, **kwargs: Any) -> RESPONSE_TYPE:
        """Run the pipeline."""
        return self.query_engine.query(query_str)
dosubot[bot] commented 3 weeks ago

The error "Query id 40b9d01f-b211-413d-b3d4-a799eff700d6 not found in either retriever_dict or query_engine_dict" indicates that the query ID is not present in the dictionaries used by the RecursiveRetriever. Here are some steps to ensure consistency and synchronization between the vector_store, retriever_dict, and query_engine_dict in the DenseXRetrievalPack class:

  1. Ensure All Nodes Are Included: Make sure that all nodes, including the original and generated proposition nodes, are included in the all_nodes_dict. This dictionary should map node IDs to nodes.

    all_nodes = nodes + sub_nodes
    all_nodes_dict = {n.node_id: n for n in all_nodes}
  2. Consistent Service Context: Use a consistent ServiceContext for both the VectorStoreIndex and the RetrieverQueryEngine. This ensures that the same LLM and embedding models are used across different components.

    service_context = ServiceContext.from_defaults(
       llm=query_llm,
       embed_model=embed_model,
       num_output=self._proposition_llm.metadata.num_output,
    )
  3. Synchronized Vector Store and Retriever: Initialize the VectorStoreIndex with all nodes and use it to create a retriever that is then included in the retriever_dict of the RecursiveRetriever. This ensures that the retriever is always in sync with the vector store.

    if os.path.exists('./chroma_db'):
       chroma_client = chromadb.PersistentClient(path="./chroma_db")
       chroma_collection = chroma_client.get_or_create_collection("quickstart")
       vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
       storage_context = StorageContext.from_defaults(vector_store=vector_store)
       self.vector_index = VectorStoreIndex.from_vector_store(vector_store, service_context=service_context)
    else:
       chroma_client = chromadb.PersistentClient(path="./chroma_db")
       chroma_collection = chroma_client.get_or_create_collection("quickstart")
       vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
       storage_context = StorageContext.from_defaults(vector_store=vector_store)
       self.vector_index = VectorStoreIndex(
           all_nodes, service_context=service_context, show_progress=True, storage_context=storage_context, store_nodes_override=True
       )
    
    self.retriever = RecursiveRetriever(
       "vector",
       retriever_dict={
           "vector": self.vector_index.as_retriever(similarity_top_k=similarity_top_k)
       },
       node_dict=all_nodes_dict,
    )
  4. Consistent Query Engine Initialization: Initialize the RetrieverQueryEngine using the same service_context and the RecursiveRetriever. This ensures that the query engine operates on the same data and configuration as the retriever and vector store.

    self.query_engine = RetrieverQueryEngine.from_args(
       self.retriever, service_context=service_context
    )
  5. Check for Missing Nodes: Ensure that the nodes are correctly added to the vector_store and that their IDs are correctly mapped in the all_nodes_dict. Any missing nodes could lead to the "query ID not found" error.

By following these practices, you can ensure that the vector_store, retriever_dict, and query_engine_dict remain consistent and synchronized within the DenseXRetrievalPack class [1][2].

To continue talking to Dosu, mention @dosu.