run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
33.42k stars 4.69k forks source link

[Question]: How can I get the source_node from the multi-document agent? #13797

Open ina5411ina opened 1 month ago

ina5411ina commented 1 month ago

Question Validation

Question

I followed this article for implementation, but when I use query(), the log only shows "=== LLM Response ===", and response.source_nodes are all empty. I want to know what went wrong and how to print out source_nodes and chunk_text for all steps.

main code:

Build Document Agent for each Document

from llama_index.agent.openai import OpenAIAgent
from llama_index.core import load_index_from_storage, StorageContext
from llama_index.core.node_parser import SentenceSplitter
import os

node_parser = SentenceSplitter()

# Build agents dictionary
agents = {}
query_engines = {}

# this is for the baseline
all_nodes = []

for idx, wiki_title in enumerate(wiki_titles):
    nodes = node_parser.get_nodes_from_documents(city_docs[wiki_title])
    all_nodes.extend(nodes)

    if not os.path.exists(f"./data/{wiki_title}"):
        # build vector index
        vector_index = VectorStoreIndex(nodes)
        vector_index.storage_context.persist(
            persist_dir=f"./data/{wiki_title}"
        )
    else:
        vector_index = load_index_from_storage(
            StorageContext.from_defaults(persist_dir=f"./data/{wiki_title}"),
        )

    # build summary index
    summary_index = SummaryIndex(nodes)
    # define query engines
    vector_query_engine = vector_index.as_query_engine(llm=Settings.llm)
    summary_query_engine = summary_index.as_query_engine(llm=Settings.llm)

    # define tools
    query_engine_tools = [
        QueryEngineTool(
            query_engine=vector_query_engine,
            metadata=ToolMetadata(
                name="vector_tool",
                description=(
                    "Useful for questions related to specific aspects of"
                    f" {wiki_title} (e.g. the history, arts and culture,"
                    " sports, demographics, or more)."
                ),
            ),
        ),
        QueryEngineTool(
            query_engine=summary_query_engine,
            metadata=ToolMetadata(
                name="summary_tool",
                description=(
                    "Useful for any requests that require a holistic summary"
                    f" of EVERYTHING about {wiki_title}. For questions about"
                    " more specific sections, please use the vector_tool."
                ),
            ),
        ),
    ]

    # build agent
    function_llm = OpenAI(model="gpt-4")
    agent = OpenAIAgent.from_tools(
        query_engine_tools,
        llm=function_llm,
        verbose=True,
        system_prompt=f"""\
You are a specialized agent designed to answer queries about {wiki_title}.
You must ALWAYS use at least one of the tools provided when answering a question; do NOT rely on prior knowledge.\
""",
    )

    agents[wiki_title] = agent
    query_engines[wiki_title] = vector_index.as_query_engine(
        similarity_top_k=2
    )

Build Retriever-Enabled OpenAI Agent

# define tool for each document agent
all_tools = []
for wiki_title in wiki_titles:
    wiki_summary = (
        f"This content contains Wikipedia articles about {wiki_title}. Use"
        f" this tool if you want to answer any questions about {wiki_title}.\n"
    )
    doc_tool = QueryEngineTool(
        query_engine=agents[wiki_title],
        metadata=ToolMetadata(
            name=f"tool_{wiki_title}",
            description=wiki_summary,
        ),
    )
    all_tools.append(doc_tool)
# define an "object" index and retriever over these tools
from llama_index.core import VectorStoreIndex
from llama_index.core.objects import ObjectIndex

obj_index = ObjectIndex.from_objects(
    all_tools,
    index_cls=VectorStoreIndex,
)
from llama_index.agent.openai import OpenAIAgent

top_agent = OpenAIAgent.from_tools(
    tool_retriever=obj_index.as_retriever(similarity_top_k=3),
    system_prompt=""" \
You are an agent designed to answer queries about a set of given cities.
Please always use the tools provided to answer a question. Do not rely on prior knowledge.\

""",
    verbose=True,
)
response = top_agent.query(
    "Give me a summary of all the positive aspects of Houston"
)
dosubot[bot] commented 1 month ago

To get the source_node from the multi-document agent and print out source_nodes and chunk_text for all steps, you can use the pprint_response function provided in the llama_index.legacy.response.pprint_utils module. This function already includes the logic to print out source_nodes and their content.

Here is an example of how you can use this function:

from llama_index.legacy.response.pprint_utils import pprint_response

# Assuming `response` is an instance of `Response` that you have obtained from the multi-document agent
pprint_response(response, show_source=True)

This will print out the source_nodes and their chunk_text for all steps. The show_source=True parameter ensures that the source nodes are displayed.

The pprint_response function internally calls pprint_source_node to print each source_node and its content. Here is the relevant part of the code:

def pprint_response(
    response: Response,
    source_length: int = 350,
    wrap_width: int = 70,
    show_source: bool = False,
) -> None:
    """Pretty print response for jupyter notebook."""
    if response.response is None:
        response_text = "None"
    else:
        response_text = response.response.strip()

    response_text = f"Final Response: {response_text}"
    print(textwrap.fill(response_text, width=wrap_width))

    if show_source:
        for ind, source_node in enumerate(response.source_nodes):
            print("_" * wrap_width)
            print(f"Source Node {ind + 1}/{len(response.source_nodes)}")
            pprint_source_node(
                source_node, source_length=source_length, wrap_width=wrap_width
            )

This function will iterate over all source_nodes in the response and print their details, including the chunk_text [1][2].

#### Details **About Dosu** This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a šŸ‘ if this is helpful and šŸ‘Ž if it is irrelevant.

To continue the conversation, mention @dosu.

ina5411ina commented 1 month ago

Hi @dosu , why my response.source_nodes is empty?

dosubot[bot] commented 1 month ago

To resolve the issue of response.source_nodes being empty, you can modify the _query method in the RetrySourceQueryEngine class to include logging statements that print out the source_nodes and their chunk_text. Here is the modified code:

def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
    response = self._query_engine._query(query_bundle)
    if self.max_retries <= 0:
        return response
    typed_response = (
        response if isinstance(response, Response) else response.get_response()
    )
    query_str = query_bundle.query_str
    eval = self._evaluator.evaluate_response(query_str, typed_response)
    if eval.passing:
        logger.debug("Evaluation returned True.")
        return response
    else:
        logger.debug("Evaluation returned False.")
        # Test source nodes
        source_evals = [
            self._evaluator.evaluate(
                query=query_str,
                response=typed_response.response,
                contexts=[source_node.node.get_content()],
            )
            for source_node in typed_response.source_nodes
        ]
        orig_nodes = typed_response.source_nodes

        # Log source_nodes and chunk_text
        for i, source_node in enumerate(orig_nodes):
            logger.debug(f"Source Node {i}: {source_node}")
            logger.debug(f"Chunk Text {i}: {source_node.node.get_content()}")

        assert len(source_evals) == len(orig_nodes)
        new_docs = []
        for node, eval_result in zip(orig_nodes, source_evals):
            if eval_result:
                new_docs.append(Document(text=node.node.get_content()))
        if len(new_docs) == 0:
            raise ValueError("No source nodes passed evaluation.")
        new_index = SummaryIndex.from_documents(
            new_docs,
        )
        new_retriever_engine = RetrieverQueryEngine(new_index.as_retriever())
        new_query_engine = RetrySourceQueryEngine(
            new_retriever_engine,
            self._evaluator,
            self._llm,
            self.max_retries - 1,
        )
        return new_query_engine.query(query_bundle)

Additionally, you can use the pprint_response method from the pprint_utils.py module to ensure that the response includes source node information in a readable format. This method formats the response to include metadata such as node_id and score for each source node. Here is the relevant part of the code:

def pprint_response(
    response: Response,
    source_length: int = 350,
    wrap_width: int = 70,
    show_source: bool = False,
) -> None:
    """Pretty print response for jupyter notebook."""
    if response.response is None:
        response_text = "None"
    else:
        response_text = response.response.strip()

    response_text = f"Final Response: {response_text}"
    print(textwrap.fill(response_text, width=wrap_width))

    if show_source:
        for ind, source_node in enumerate(response.source_nodes):
            print("_" * wrap_width)
            print(f"Source Node {ind + 1}/{len(response.source_nodes)}")
            pprint_source_node(
                source_node, source_length=source_length, wrap_width=wrap_width
            )

By incorporating these changes, you can ensure that source_nodes and their chunk_text are logged and included in the response, helping you debug and resolve the issue of response.source_nodes being empty [1][2][3][4].

#### Details **About Dosu** This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a šŸ‘ if this is helpful and šŸ‘Ž if it is irrelevant.

To continue the conversation, mention @dosu.

ina5411ina commented 1 month ago

Hi @dosu ,

I tried the RetrySourceQueryEngine method you mentioned above, but I got a ValueError: No source nodes passed evaluation. Why ~?

Here is my code.

from llama_index.agent.openai import OpenAIAgent
from llama_index.core import load_index_from_storage, StorageContext
from llama_index.core.node_parser import SentenceSplitter
import os

node_parser = SentenceSplitter()

# Build agents dictionary
agents = {}
query_engines = {}

# this is for the baseline
all_nodes = []

for idx, wiki_title in enumerate(wiki_titles):
    nodes = node_parser.get_nodes_from_documents(city_docs[wiki_title])
    all_nodes.extend(nodes)

    if not os.path.exists(f"./data/{wiki_title}"):
        # build vector index
        vector_index = VectorStoreIndex(nodes)
        vector_index.storage_context.persist(
            persist_dir=f"./data/{wiki_title}"
        )
    else:
        vector_index = load_index_from_storage(
            StorageContext.from_defaults(persist_dir=f"./data/{wiki_title}"),
        )

    # build summary index
    summary_index = SummaryIndex(nodes)
    # define query engines
    vector_query_engine = vector_index.as_query_engine(llm=Settings.llm)
    summary_query_engine = summary_index.as_query_engine(llm=Settings.llm)

    # define tools
    query_engine_tools = [
        QueryEngineTool(
            query_engine=vector_query_engine,
            metadata=ToolMetadata(
                name="vector_tool",
                description=(
                    "Useful for questions related to specific aspects of"
                    f" {wiki_title} (e.g. the history, arts and culture,"
                    " sports, demographics, or more)."
                ),
            ),
        ),
        QueryEngineTool(
            query_engine=summary_query_engine,
            metadata=ToolMetadata(
                name="summary_tool",
                description=(
                    "Useful for any requests that require a holistic summary"
                    f" of EVERYTHING about {wiki_title}. For questions about"
                    " more specific sections, please use the vector_tool."
                ),
            ),
        ),
    ]

    # build agent
    function_llm = OpenAI(model="gpt-4")
    agent = OpenAIAgent.from_tools(
        query_engine_tools,
        llm=function_llm,
        verbose=True,
        system_prompt=f"""\
You are a specialized agent designed to answer queries about {wiki_title}.
You must ALWAYS use at least one of the tools provided when answering a question; do NOT rely on prior knowledge.\
""",
    )

    agents[wiki_title] = agent
    query_engines[wiki_title] = vector_index.as_query_engine(
        similarity_top_k=2
    )
# define tool for each document agent
all_tools = []
for wiki_title in wiki_titles:
    wiki_summary = (
        f"This content contains Wikipedia articles about {wiki_title}. Use"
        f" this tool if you want to answer any questions about {wiki_title}.\n"
    )
    doc_tool = QueryEngineTool(
        query_engine=agents[wiki_title],
        metadata=ToolMetadata(
            name=f"tool_{wiki_title}",
            description=wiki_summary,
        ),
    )
    all_tools.append(doc_tool)
# define an "object" index and retriever over these tools
from llama_index.core import VectorStoreIndex
from llama_index.core.objects import ObjectIndex

obj_index = ObjectIndex.from_objects(
    all_tools,
    index_cls=VectorStoreIndex,
)
from llama_index.agent.openai import OpenAIAgent

top_agent = OpenAIAgent.from_tools(
    tool_retriever=obj_index.as_retriever(similarity_top_k=3),
    system_prompt=""" \
You are an agent designed to answer queries about a set of given cities.
Please always use the tools provided to answer a question. Do not rely on prior knowledge.\

""",
    verbose=True,
)
import logging
from typing import Optional

from llama_index.core.base.base_query_engine import BaseQueryEngine
from llama_index.core.base.response.schema import RESPONSE_TYPE, Response
from llama_index.core.callbacks.base import CallbackManager
from llama_index.core.evaluation import BaseEvaluator
from llama_index.core.indices.list.base import SummaryIndex
from llama_index.core.llms.llm import LLM
from llama_index.core.prompts.mixin import PromptMixinType
from llama_index.core.query_engine.retriever_query_engine import (
    RetrieverQueryEngine,
)
from llama_index.core.schema import Document, QueryBundle
from llama_index.core.service_context import ServiceContext
from llama_index.core.settings import (
    Settings,
    callback_manager_from_settings_or_context,
    llm_from_settings_or_context,
)

logger = logging.getLogger(__name__)

class RetrySourceQueryEngine(BaseQueryEngine):
    """Retry with different source nodes."""

    def __init__(
        self,
        query_engine: RetrieverQueryEngine,
        evaluator: BaseEvaluator,
        llm: Optional[LLM] = None,
        max_retries: int = 3,
        callback_manager: Optional[CallbackManager] = None,
        # deprecated
        service_context: Optional[ServiceContext] = None,
    ) -> None:
        """Run a BaseQueryEngine with retries."""
        self._query_engine = query_engine
        self._evaluator = evaluator
        self._llm = llm or llm_from_settings_or_context(Settings, service_context)
        self.max_retries = max_retries
        super().__init__(
            callback_manager=callback_manager
            or callback_manager_from_settings_or_context(Settings, service_context)
        )

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {"query_engine": self._query_engine, "evaluator": self._evaluator}

    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        response = self._query_engine._query(query_bundle)
        if self.max_retries <= 0:
            return response
        typed_response = (
            response if isinstance(response, Response) else response.get_response()
        )
        query_str = query_bundle.query_str
        eval = self._evaluator.evaluate_response(query_str, typed_response)
        if eval.passing:
            logger.debug("Evaluation returned True.")
            return response
        else:
            logger.debug("Evaluation returned False.")
            # Test source nodes
            source_evals = [
                self._evaluator.evaluate(
                    query=query_str,
                    response=typed_response.response,
                    contexts=[source_node.get_content()],
                )
                for source_node in typed_response.source_nodes
            ]
            orig_nodes = typed_response.source_nodes

            # Log source_nodes and chunk_text
            for i, source_node in enumerate(orig_nodes):
                logger.debug(f"Source Node {i}: {source_node}")
                logger.debug(f"Chunk Text {i}: {source_node.node.get_content()}")

            assert len(source_evals) == len(orig_nodes)
            new_docs = []
            for node, eval_result in zip(orig_nodes, source_evals):
                if eval_result:
                    new_docs.append(Document(text=node.node.get_content()))
            if len(new_docs) == 0:
                raise ValueError("No source nodes passed evaluation.")
            new_index = SummaryIndex.from_documents(
                new_docs,
            )
            new_retriever_engine = RetrieverQueryEngine(new_index.as_retriever())
            new_query_engine = RetrySourceQueryEngine(
                new_retriever_engine,
                self._evaluator,
                self._llm,
                self.max_retries - 1,
            )
            return new_query_engine.query(query_bundle)

    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Not supported."""
        return self._query(query_bundle)
from llama_index.core.evaluation import RelevancyEvaluator

query_response_evaluator = RelevancyEvaluator()
retry_source_query_engine = RetrySourceQueryEngine(
    query_engine = top_agent , evaluator = query_response_evaluator, max_retries=5
)
retry_source_response = retry_source_query_engine.query(test_queries[query_num])
print(retry_source_response)
dosubot[bot] commented 1 month ago

The ValueError: No source nodes passed evaluation. error occurs when none of the source nodes meet the criteria defined by the evaluator. To resolve this, you need to ensure that at least one source node passes the evaluation. Here are some steps to help you debug and resolve this issue:

  1. Check the Evaluation Criteria: Ensure that the evaluation criteria in your RelevancyEvaluator are not too strict, causing all source nodes to fail.

  2. Debug Source Nodes: Add logging to inspect the content and evaluation results of the source nodes to understand why they are failing.

  3. Fallback Mechanism: Implement a fallback mechanism to handle cases where no source nodes pass the evaluation.

Here is an example of how you might add logging to debug the issue:

def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
    response = self._query_engine._query(query_bundle)
    if self.max_retries <= 0:
        return response
    typed_response = (
        response if isinstance(response, Response) else response.get_response()
    )
    query_str = query_bundle.query_str
    eval = self._evaluator.evaluate_response(query_str, typed_response)
    if eval.passing:
        logger.debug("Evaluation returned True.")
        return response
    else:
        logger.debug("Evaluation returned False.")
        # Test source nodes
        source_evals = [
            self._evaluator.evaluate(
                query=query_str,
                response=typed_response.response,
                contexts=[source_node.get_content()],
            )
            for source_node in typed_response.source_nodes
        ]
        orig_nodes = typed_response.source_nodes

        # Log source_nodes and chunk_text
        for i, source_node in enumerate(orig_nodes):
            logger.debug(f"Source Node {i}: {source_node}")
            logger.debug(f"Chunk Text {i}: {source_node.node.get_content()}")

        assert len(source_evals) == len(orig_nodes)
        new_docs = []
        for node, eval_result in zip(orig_nodes, source_evals):
            if eval_result:
                new_docs.append(Document(text=node.get_content()))
        if len(new_docs) == 0:
            logger.error("No source nodes passed evaluation. Original nodes: %s", orig_nodes)
            raise ValueError("No source nodes passed evaluation.")
        new_index = SummaryIndex.from_documents(
            new_docs,
        )
        new_retriever_engine = RetrieverQueryEngine(new_index.as_retriever())
        new_query_engine = RetrySourceQueryEngine(
            new_retriever_engine,
            self._evaluator,
            self._llm,
            self.max_retries - 1,
        )
        return new_query_engine.query(query_bundle)

By adding these debug statements, you can get more insights into why the source nodes are failing the evaluation and adjust your evaluator or source nodes accordingly.

Additionally, ensure that your RelevancyEvaluator is correctly configured. The RelevancyEvaluator uses a template to determine if the response aligns with the context information, answering "YES" if it does and "NO" otherwise. The result is considered passing if the response is deemed relevant ("YES") [1].

By ensuring that your evaluation criteria are appropriate and adding logging for debugging, you can better understand and resolve the issue of no source nodes passing evaluation [2][3].

#### Details **About Dosu** This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a šŸ‘ if this is helpful and šŸ‘Ž if it is irrelevant.

To continue the conversation, mention @dosu.