run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
36.66k stars 5.25k forks source link

[Question]: A bug of workflow #16415

Closed 202030481266 closed 1 month ago

202030481266 commented 1 month ago

Question Validation

Question

from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore
from llama_index.core import VectorStoreIndex
from llama_index.core.response_synthesizers import CompactAndRefine
from llama_index.core.workflow import (
    Context,
    Workflow,
    StartEvent,
    StopEvent,
    step,
)
from custom_docx_parser import LlamaIndexDocxClient
import json
import os
import asyncio
from llama_index.core.schema import IndexNode, TextNode
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.postprocessor.flag_embedding_reranker import FlagEmbeddingReranker
from llama_index.core import Settings
from llama_index.core import StorageContext
from llama_index.vector_stores.milvus import MilvusVectorStore
from FlagEmbedding import BGEM3FlagModel
from typing import List
from llama_index.vector_stores.milvus.utils import BaseSparseEmbeddingFunction
from llama_index.core.schema import NodeWithScore
from llama_index.core.response.notebook_utils import display_source_node, display_response

embed_model = HuggingFaceEmbedding(model_name='/zhangshuhai/work/embed/Xorbits/bge-m3', device="cuda:1")

llm = Ollama(
    model="llama3.1:70b", 
    context_window=32768, 
    temperature=0.7,
    keep_alive="3h",
    request_timeout=400
)

rerank_top_5 = FlagEmbeddingReranker(model="/zhangshuhai/work/embed/AI-ModelScope/bge-reranker-v2-m3", top_n=5)
rerank_top_10 = FlagEmbeddingReranker(model="/zhangshuhai/work/embed/AI-ModelScope/bge-reranker-v2-m3", top_n=10)

# config the resource
Settings.llm = llm
Settings.embed_model = embed_model

class RetrieverEvent(Event):
    """Result of running retrieval"""
    nodes: list[NodeWithScore]

class IngestEvent(Event):
    """Result of ingest process"""
    index_nodes: list[IndexNode]
    text_nodes: list[TextNode]

class RerankEvent(Event):
    """Result of running reranking on retrieved nodes"""
    nodes: list[NodeWithScore]

class PostProcessorEvent(Event):
    """Result of post processor with custom methods"""
    nodes: list[NodeWithScore]

# custom sparse embedding function
class FlagSparseEmbeddingFunction(BaseSparseEmbeddingFunction):
    _instance = None  # singleton pattern, avoid huge gpu memory use

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super().__new__(cls, *args, **kwargs)
        return cls._instance

    def __init__(self):
        if not hasattr(self, 'initialized'): 
            self.model = BGEM3FlagModel("/zhangshuhai/work/embed/Xorbits/bge-m3", use_fp16=False)
            self.initialized = True

    def encode_queries(self, queries: List[str]):
        outputs = self.model.encode(
            queries,
            return_dense=False,
            return_sparse=True,
            return_colbert_vecs=False,
        )["lexical_weights"]
        return [self._to_standard_dict(output) for output in outputs]

    def encode_documents(self, documents: List[str]):
        outputs = self.model.encode(
            documents,
            return_dense=False,
            return_sparse=True,
            return_colbert_vecs=False,
        )["lexical_weights"]
        return [self._to_standard_dict(output) for output in outputs]

    def _to_standard_dict(self, raw_output):
        result = {}
        for k in raw_output:
            result[int(k)] = raw_output[k]
        return result

# milvus vector database setting
milvus_vector_store = MilvusVectorStore(
    dim=1024,
    overwrite=True,
    enable_sparse=True,
    sparse_embedding_function=FlagSparseEmbeddingFunction(),
    hybrid_ranker="RRFRanker",
    hybrid_ranker_params={"k": 60},
)
milvus_storage_context = StorageContext.from_defaults(vector_store=milvus_vector_store)

class HybridRAGWorkflow(Workflow):
    @step(num_workers=1)
    async def ingest(self, ctx: Context, ev: StartEvent) -> IngestEvent | None:
        """Entry point to ingest a document, triggered by a StartEvent with `dirname`."""
        dirname = ev.get("dirname")
        if not dirname:
            return None

        absolute_paths = []
        for root, dir, files in os.walk(dirname):
            for file in files:
                if file.lower().endswith('.docx'):
                    absolute_path = os.path.join(root, file)
                    absolute_paths.append(os.path.abspath(absolute_path))

        docx_client = LlamaIndexDocxClient(chunk_size=1024, docx_file_path='dirname')
        index_nodes = []
        text_nodes = []
        for path in absolute_paths:
            docx_client.clear_nodes()
            docx_client.docx_file_path = path
            docx_client.process_docx_to_nodes(use_reference=True)
            index_nodes.extend(docx_client.reference_nodes)
            text_nodes.extend(docx_client.fulltext_nodes)

        return IngestEvent(index_nodes=index_nodes, text_nodes=text_nodes)

    @step(num_workers=1)
    async def index(self, ctx: Context, ev: IngestEvent) -> StopEvent | None:
        """create index and index dicts"""
        index_nodes = ev.get('index_nodes')
        text_nodes = ev.get('text_nodes')
        if not index_nodes or not text_nodes:
            print("Index and text nodes is empty, please run the ingest function before indexing!")
            return None

        index_nodes_dict = {node.id_ : node for node in index_nodes}
        text_nodes_dict = {node.id_ : node for node in text_nodes}
        await ctx.set('index_nodes_dict', index_nodes_dict)
        await ctx.set('text_nodes_dict', text_nodes_dict)

        # use milvus vector store
        milvus_simple_vector_index = VectorStoreIndex(nodes=index_nodes, storage_context=milvus_storage_context)

        return StopEvent(result=milvus_simple_vector_index)

    @step(num_workers=3)
    async def retrieve(
        self, ctx: Context, ev: StartEvent
    ) -> RetrieverEvent | None:
        "Entry point for RAG, triggered by a StartEvent with `query`."
        query = ev.get("query")
        index = ev.get("index")

        if not query:
            return None

        print(f"Query the documents with: {query}")

        # store the query in the global context
        await ctx.set("query", query)

        if index is None:
            print("Index is empty, load some documents before querying!")
            return None

        # we assume that we use the milvus vector index and use the hybrid retrieve mode
        retriever = index.as_retriever(vector_store_query_mode="hybrid", similarity_top_k=20)
        nodes = await retriever.aretrieve(query)
        print(f"Retrieved {len(nodes)} nodes.")
        return RetrieverEvent(nodes=nodes)

    @step(num_workers=3)
    async def rerank(self, ctx: Context, ev: RetrieverEvent | PostProcessorEvent) -> RerankEvent:
        # Rerank the nodes
        print(await ctx.get("query", default=None), flush=True)
        new_nodes = rerank_top_10.postprocess_nodes(
            ev.nodes, query_str=await ctx.get("query", default=None)
        )
        print(f"Reranked nodes to {len(new_nodes)}")
        return RerankEvent(nodes=new_nodes)

    @step(num_workers=5)
    async def add_context_postprocess(self, ctx: Context, ev: RerankEvent) -> PostProcessorEvent | None:
        """replace the index node for parent chunk!"""
        is_add_context = await ctx.get('is_add_context')
        if is_add_context:
            return None # already done!
        text_node_dict = await ctx.get('text_nodes_dict')
        if not text_node_dict:
            print("Text nodes ID is lost! Please restart the workflow!")
            return None

        # set the is_add_context variable
        await ctx.set('is_add_context', True)

        nodes = ev.nodes
        new_nodes = []
        nodes_id = set([n.node.id_ for n in nodes])
        for n in nodes:
            # n is index node
            text_node_id = n.node.index_id
            if text_node_id not in nodes_id and text_node_id in text_node_dict:
                nodes_id.add(text_node_id)
                new_nodes.append(NodeWithScore(node=text_node_dict[text_node_id],score=n.score))
        return new_nodes

    @step(num_workers=1)
    async def synthesize(self, ctx: Context, ev: RerankEvent) -> StopEvent:
        """Return a streaming response using reranked nodes."""
        is_add_context = await ctx.get('is_add_context')
        if not is_add_context:
            return None 
        summarizer = CompactAndRefine(llm=llm, streaming=True, verbose=True)
        query = await ctx.get("query", default=None)
        response = await summarizer.asynthesize(query, nodes=ev.nodes)
        return StopEvent(result=response)

async def process(rag_flow: HybridRAGWorkflow, query: str, index: VectorStoreIndex, ctx: Context):
    response = await rag_flow.run(query=query, index=index, ctx=ctx)
    return response

async def main(queries: List[str]):
    rag_flow = HybridRAGWorkflow()
    handler = rag_flow.run(dirname='./data')
    index = await handler
    tasks = [process(rag_flow, q, index, handler.ctx) for q in queries]
    responses = await asyncio.gather(*tasks)
    return responses

if __name__ == "__main__":
    eval_dataset = json.load(open('./eval_dataset.json', 'r', encoding='utf-8'))
    quetion = eval_dataset['questions']
    responses = asyncio.run(main(quetion))
    for r in responses:
        display_response(r)

My code is keep reporting:

Index and text nodes is empty, please run the ingest function before indexing!

I wonnder why this could happen and how to fix it?

dosubot[bot] commented 1 month ago

The error "Index and text nodes is empty, please run the ingest function before indexing!" occurs because the index_nodes and text_nodes lists are empty when the index step is executed. This typically means the ingest function did not populate these lists correctly.

Here’s how you can fix it:

  1. Check the Ingest Function: Ensure that the ingest function is correctly processing DOCX files and populating index_nodes and text_nodes. Verify that the LlamaIndexDocxClient is correctly initialized and that process_docx_to_nodes is functioning as expected.

  2. Verify File Paths: Make sure the dirname provided to the ingest function contains DOCX files. The os.walk loop should correctly identify and process these files.

  3. Debugging: Add print statements or logging inside the ingest function to confirm that files are being processed and nodes are being added to index_nodes and text_nodes.

  4. Check LlamaIndexDocxClient: Ensure that LlamaIndexDocxClient is correctly implemented and that its methods are functioning as expected to parse DOCX files into nodes.

By ensuring that the ingest function is correctly populating the nodes, the error should be resolved [1][2].

To continue talking to Dosu, mention @dosu.