run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
36.8k stars 5.28k forks source link

Inconsistent Embedding Reuse Behavior in VectorStoreIndex with DenseXRetrievalPack #14417

Closed LikhithRishi closed 1 month ago

LikhithRishi commented 4 months ago

Question Validation

Question

class DenseXRetrievalPack(BaseLlamaPack):
def init(
self,
documents: List[Document],
proposition_llm: Optional[LLM] = None,
query_llm: Optional[LLM] = None,
embed_model: Optional[BaseEmbedding] = None,
text_splitter: TextSplitter = SentenceSplitter(),
vector_store: Optional[ElasticsearchStore] = None,
similarity_top_k: int = 4,
) -> None:
"""Init params."""
self._proposition_llm = llm

    embed_model = embed_model

    nodes = text_splitter.get_nodes_from_documents(documents)
    sub_nodes = self._gen_propositions(nodes)
    all_nodes = nodes + sub_nodes
    all_nodes_dict = {n.node_id: n for n in all_nodes}

    service_context = ServiceContext.from_defaults(
        llm=query_llm ,
        embed_model=embed_model,
        num_output=self._proposition_llm.metadata.num_output,
    )

    if os.path.exists('./chroma_db'):
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        chroma_collection = chroma_client.get_or_create_collection("quickstart")
        vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
        storage_context = StorageContext.from_defaults(vector_store=vector_store)
        self.vector_index = VectorStoreIndex.from_vector_store(vector_store,service_context=service_context)
    else:
       chroma_client = chromadb.PersistentClient(path="./chroma_db")
       chroma_collection = chroma_client.get_or_create_collection("quickstart")
       vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
       storage_context = StorageContext.from_defaults(vector_store=vector_store)
       self.vector_index = VectorStoreIndex(
            nodes, service_context=service_context, show_progress=True,storage_context=storage_context,store_nodes_override=True
            )
    self.retriever = RecursiveRetriever(
        "vector",
        retriever_dict={
            "vector": self.vector_index.as_retriever(
                similarity_top_k=similarity_top_k
            )
        },
        node_dict=all_nodes_dict,
    )

    self.query_engine = RetrieverQueryEngine.from_args(
        self.retriever, service_context=service_context
    )

async def _aget_proposition(self, node: TextNode) -> List[TextNode]:
    """Get proposition."""
    inital_output = await self._proposition_llm.apredict(
        PROPOSITIONS_PROMPT, node_text=node.text
    )
    outputs = inital_output.split("\n")

    all_propositions = []

    for output in outputs:
        if not output.strip():
            continue
        if not output.strip().endswith("]"):
            if not output.strip().endswith('"') and not output.strip().endswith(
                ","
            ):
                output = output + '"'
            output = output + " ]"
        if not output.strip().startswith("["):
            if not output.strip().startswith('"'):
                output = '"' + output
            output = "[ " + output

        try:
            propositions = json.loads(output)
        except Exception:
            # fallback to yaml
            try:
                propositions = yaml.safe_load(output)
            except Exception:
                # fallback to next output
                continue

        if not isinstance(propositions, list):
            continue

        all_propositions.extend(propositions)

    assert isinstance(all_propositions, list)
    nodes = [TextNode(text=prop) for prop in all_propositions if prop]

    return [IndexNode.from_text_node(n, node.node_id) for n in nodes]

def _gen_propositions(self, nodes: List[TextNode]) -> List[TextNode]:
    """Get propositions."""
    sub_nodes = asyncio.run(
        run_jobs(
            [self._aget_proposition(node) for node in nodes],
            show_progress=True,
            workers=8,
        )
    )
    print(sub_nodes)

    # Flatten list
    return [node for sub_node in sub_nodes for node in sub_node]

def get_modules(self) -> Dict[str, Any]:
    """Get modules."""
    return {
        "query_engine": self.query_engine,
        "retriever": self.retriever,
    }

def run(self, query_str: str, **kwargs: Any) -> RESPONSE_TYPE:
    """Run the pipeline."""
    return self.query_engine.query(query_str)

In the DenseXRetrievalPack class, I'm encountering an issue with VectorStoreIndex when using different sets of nodes (nodes versus all_nodes). Relevant Code Lines:

if os.path.exists('./chroma_db'):
    chroma_client = chromadb.PersistentClient(path="./chroma_db")
    chroma_collection = chroma_client.get_or_create_collection("quickstart")
    vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
    storage_context = StorageContext.from_defaults(vector_store=vector_store)
    self.vector_index = VectorStoreIndex.from_vector_store(vector_store, service_context=service_context)
else:
    chroma_client = chromadb.PersistentClient(path="./chroma_db")
    chroma_collection = chroma_client.get_or_create_collection("quickstart")
    vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
    storage_context = StorageContext.from_defaults(vector_store=vector_store)
    self.vector_index = VectorStoreIndex(
        nodes, service_context=service_context, show_progress=True, storage_context=storage_context, store_nodes_override=True
    )

Issue: The issue occurs with both ChromaDB and Elasticsearch. When using nodes alone, the embeddings can be reused without any issue. The problem arises specifically when all_nodes is used, which includes both nodes and sub_nodes, result in the error: ValueError: Query id d28e3de4-1c4f-420a-9184-97bf8556b11b not found in either retriever_dict or query_engine_dict.

Expected Behavior: I expect VectorStoreIndex to handle querying using embeddings from all_nodes (nodes+sub_nodes)without errors, similar to when using nodes.

Questions:

Why does the error occur when using all_nodes but not with nodes alone? How can I modify the code to successfully reuse embeddings with all_nodes? Is there a specific way to handle all_nodes with ChromaDB and Elasticsearch to avoid this issue?

dosubot[bot] commented 4 months ago

The discrepancy between nodes and all_nodes usage in the VectorStoreIndex of the DenseXRetrievalPack class could be due to the filtering and querying logic implemented in the monkey_query function. Specifically, the function filters nodes based on metadata and node ID restrictions before performing the query. If all_nodes is not being considered in this filtering process, it could lead to a discrepancy.

To configure VectorStoreIndex to successfully query using embeddings from all_nodes, ensure that the filtering logic in the monkey_query function includes all relevant nodes. Here is the relevant part of the code that handles the filtering and querying:

def monkey_query(
    self,
    query: VectorStoreQuery,
    **kwargs: Any,
):
    """Get nodes for response."""
    # Prevent metadata filtering on stores that were persisted without metadata.
    if (
        query.filters is not None
        and self._data.embedding_dict
        and not self._data.metadata_dict
    ):
        raise ValueError(
            "Cannot filter stores that were persisted without metadata. "
            "Please rebuild the store with metadata to enable filtering."
        )
    # Prefilter nodes based on the query filter and node ID restrictions.
    query_filter_fn = _build_metadata_filter_fn(
        lambda node_id: self._data.metadata_dict[node_id], query.filters
    )

    if query.node_ids is not None:
        available_ids = set(query.node_ids)

        def node_filter_fn(node_id: str) -> bool:
            return node_id in available_ids

    else:

        def node_filter_fn(node_id: str) -> bool:
            return True

    node_ids = []
    embeddings = []
    # TODO: consolidate with get_query_text_embedding_similarities
    for node_id, embedding in self._data.embedding_dict.items():
        if node_filter_fn(node_id) and query_filter_fn(node_id):
            node_ids.append(node_id)
            embeddings.append(embedding)

    query_embedding = cast(List[float], query.query_embedding)
    if query.mode in LEARNER_MODES:
        top_similarities, top_ids = get_top_k_embeddings_learner(
            query_embedding,
            embeddings,
            similarity_top_k=query.similarity_top_k,
            embedding_ids=node_ids,
        )
    elif query.mode == MMR_MODE:
        mmr_threshold = kwargs.get("mmr_threshold", None)
        top_similarities, top_ids = get_top_k_mmr_embeddings(
            query_embedding,
            embeddings,
            similarity_top_k=query.similarity_top_k,
            embedding_ids=node_ids,
            mmr_threshold=mmr_threshold,
        )
    elif query.mode == VectorStoreQueryMode.DEFAULT:
        top_similarities, top_ids = get_top_k_embeddings(
            query_embedding,
            embeddings,
            similarity_top_k=query.similarity_top_k,
            embedding_ids=node_ids,
        )
    elif query.mode == VectorStoreQueryMode.HYBRID:  # where I made my changes
        top_similarities, top_ids = get_top_k_embeddings(
            query_embedding,
            embeddings,
            similarity_top_k=query.similarity_top_k,
            embedding_ids=node_ids,
        )
    else:
        raise ValueError(f"Invalid query mode: {query.mode}")

    return VectorStoreQueryResult(similarities=top_similarities, ids=top_ids)

Ensure that the node_filter_fn and query_filter_fn functions are correctly configured to include all relevant nodes from all_nodes. If all_nodes is a broader set than the current filtering allows, you may need to adjust the filtering logic to ensure it includes all necessary nodes.

Additionally, the VectorIndexRetriever class in the llama-index-legacy/llama_index/legacy/indices/vector_store/retrievers/retriever.py module provides a method to build and execute queries against the VectorStoreIndex. Here is an example of how to configure and use this class:

class VectorIndexRetriever(BaseRetriever):
    def __init__(
        self,
        index: VectorStoreIndex,
        similarity_top_k: int = DEFAULT_SIMILARITY_TOP_K,
        vector_store_query_mode: VectorStoreQueryMode = VectorStoreQueryMode.DEFAULT,
        filters: Optional[MetadataFilters] = None,
        alpha: Optional[float] = None,
        node_ids: Optional[List[str]] = None,
        doc_ids: Optional[List[str]] = None,
        sparse_top_k: Optional[int] = None,
        callback_manager: Optional[CallbackManager] = None,
        object_map: Optional[dict] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> None:
        self._index = index
        self._vector_store = self._index.vector_store
        self._service_context = self._index.service_context
        self._docstore = self._index.docstore
        self._similarity_top_k = similarity_top_k
        self._vector_store_query_mode = VectorStoreQueryMode(vector_store_query_mode)
        self._alpha = alpha
        self._node_ids = node_ids
        self._doc_ids = doc_ids
        self._filters = filters
        self._sparse_top_k = sparse_top_k
        self._kwargs: Dict[str, Any] = kwargs.get("vector_store_kwargs", {})
        super().__init__(
            callback_manager=callback_manager, object_map=object_map, verbose=verbose
        )

    def _build_vector_store_query(
        self, query_bundle_with_embeddings: QueryBundle
    ) -> VectorStoreQuery:
        return VectorStoreQuery(
            query_embedding=query_bundle_with_embeddings.embedding,
            similarity_top_k=self._similarity_top_k,
            node_ids=self._node_ids,
            doc_ids=self._doc_ids,
            query_str=query_bundle_with_embeddings.query_str,
            mode=self._vector_store_query_mode,
            alpha=self._alpha,
            filters=self._filters,
            sparse_top_k=self._sparse_top_k,
        )

    def _build_node_list_from_query_result(
        self, query_result: VectorStoreQueryResult
    ) -> List[NodeWithScore]:
        if query_result.nodes is None:
            if query_result.ids is None:
                raise ValueError(
                    "Vector store query result should return at "
                    "least one of nodes or ids."
                )
            assert isinstance(self._index.index_struct, IndexDict)
            node_ids = [
                self._index.index_struct.nodes_dict[idx] for idx in query_result.ids
            ]
            nodes = self._docstore.get_nodes(node_ids)
            query_result.nodes = nodes
        else:
            for i in range(len(query_result.nodes)):
                source_node = query_result.nodes[i].source_node
                if (not self._vector_store.stores_text) or (
                    source_node is not None and source_node.node_type != ObjectType.TEXT
                ):
                    node_id = query_result.nodes[i].node_id
                    if self._docstore.document_exists(node_id):
                        query_result.nodes[i] = self._docstore.get_node(
                            node_id
                        )  # type: ignore[index]

        log_vector_store_query_result(query_result)

        node_with_scores: List[NodeWithScore] = []
        for ind, node in enumerate(query_result.nodes):
            score: Optional[float] = None
            if query_result.similarities is not None:
                score = query_result.similarities[ind]
            node_with_scores.append(NodeWithScore(node=node, score=score))

        return node_with_scores

    def _get_nodes_with_embeddings(
        self, query_bundle_with_embeddings: QueryBundle
    ) -> List[NodeWithScore]:
        query = self._build_vector_store_query(query_bundle_with_embeddings)
        query_result = self._vector_store.query(query, **self._kwargs)
        return self._build_node_list_from_query_result(query_result)

    async def _aget_nodes_with_embeddings(
        self, query_bundle_with_embeddings: QueryBundle
    ) -> List[NodeWithScore]:
        query = self._build_vector_store_query(query_bundle_with_embeddings)
        query_result = await self._vector_store.aquery(query, **self._kwargs)
        return self._build_node_list_from_query_result(query_result)

By using the VectorIndexRetriever class, you can ensure that the query includes all relevant nodes from all_nodes by properly configuring the node_ids and filters parameters. This will help in resolving the discrepancy and successfully querying using embeddings from all_nodes [1][2][3][4].

To continue talking to Dosu, mention @dosu.