run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
35.36k stars 4.98k forks source link

[Question]: How to generate `source_nodes` from the response of `astream_chat` engine #15323

Open RakeshReddyKondeti opened 1 month ago

RakeshReddyKondeti commented 1 month ago

Question Validation

Question

Hello,

I am currently working on a project that involves using the streaming chat engine, and I need some assistance with generating source_nodes information from the response. Specifically, I am using the following code snippet:

gen = await _chat_engine.astream_chat(question)
async for token in gen.async_response_gen():
    print(token)

Could you please provide guidance or examples on how to generate source_nodes from the streaming chat engine? Any help or pointers would be greatly appreciated.

Thank you!

dosubot[bot] commented 1 month ago

Hey @RakeshReddyKondeti! I'm here to help you out. If you need assistance with bug fixes, have questions, or want to become a contributor, I'm your bot!

To generate source_nodes information from the response when using the streaming chat engine, you can rely on the __post_init__ method of the StreamingAgentChatResponse class. This method ensures that if sources are provided and source_nodes are not, it will populate source_nodes from the raw_output of each ToolOutput in sources.

Here is the relevant code snippet from the StreamingAgentChatResponse class:

@dataclass
class StreamingAgentChatResponse:
    """Streaming chat response to user and writing to chat history."""

    response: str = ""
    sources: List[ToolOutput] = field(default_factory=list)
    chat_stream: Optional[ChatResponseGen] = None
    achat_stream: Optional[ChatResponseAsyncGen] = None
    source_nodes: List[NodeWithScore] = field(default_factory=list)
    unformatted_response: str = ""
    queue: Queue = field(default_factory=Queue)
    aqueue: Optional[asyncio.Queue] = None
    is_function: Optional[bool] = None
    is_done = False
    new_item_event: Optional[asyncio.Event] = None
    is_function_false_event: Optional[asyncio.Event] = None
    is_function_not_none_thread_event: Event = field(default_factory=Event)
    exception: Optional[Exception] = None

    def __post_init__(self) -> None:
        if self.sources and not self.source_nodes:
            for tool_output in self.sources:
                if isinstance(tool_output.raw_output, (Response, StreamingResponse)):
                    self.source_nodes.extend(tool_output.raw_output.source_nodes)

This method will automatically populate source_nodes if sources are provided and source_nodes are initially empty. You don't need to do anything extra; just ensure that your sources are correctly set when creating an instance of StreamingAgentChatResponse.

Additionally, you can access the source nodes directly from the response object using response.source_nodes. This allows you to display the source nodes while the LLM is streaming its response. For example, in a recent discussion on the LlamaIndex Discord channel, it was confirmed that response.source_nodes will get the source nodes, and response.response_gen will get the generator for the streaming response [1][2][3].

To continue talking to Dosu, mention @dosu.

logan-markewich commented 1 month ago

It's already there? gen.source_nodes

RakeshReddyKondeti commented 1 month ago

Hi @logan-markewich, I tried using gen.source_nodes to generate source_nodes from the astream_chat engine. But it always returns an empty list. Below is the code snippet

_chat_engine= await chat_engine.fetch_chat_engine(question)
 gen = await _chat_engine.astream_chat(question)

 async for token in gen.async_response_gen():
    print(token, end='', flush=True)

 print('source_nodes: ', gen.source_nodes)   # always returns []
logan-markewich commented 1 month ago

Are you using an agent as your chat engine? If it's empty, that means it didn't call any tools that used nodes.

You can also check gen.sources for all tool calls (if any)

RakeshReddyKondeti commented 4 weeks ago

Hi @logan-markewich,

I am using RouterQueryEngine as my chat_engine. I am not sure whether this is referred to as an agent or not.

I checked with gen.sources and got some output like this:

[ToolOutput(content="", 
            tool_name='query_engine', 
            raw_input={'query': my question here}, 
            raw_output=AsyncStreamingResponse(response_gen=<async_generator object astream_completion_response_to_tokens.<locals>.gen at 0x000002A969B65740>, 
            source_nodes=[similarity_top_k NodeWithScore objects here])]

I was able to access source_nodes by doing the following:

_chat_engine = await chat_engine.fetch_chat_engine(question)
gen = await _chat_engine.astream_chat(question)

async for token in gen.async_response_gen():
    await _update_response(response_message, response)
    print(token, end='', flush=True)

for source in gen.sources:
    for idx, source_node in enumerate(source.raw_output.source_nodes):
        print(f"source_node_{idx}_filename: {source_node.metadata['file_name']}")
        print(f"source_node_{idx}_text: {source_node.text}")

Is there a cleaner way of doing this, like using gen.source_nodes directly?

logan-markewich commented 4 weeks ago

Ah, I think this function on the StreamingAgentChatResponse class needs to be updated

    def __post_init__(self) -> None:
        if self.sources and not self.source_nodes:
            for tool_output in self.sources:
                if isinstance(tool_output.raw_output, (Response, StreamingResponse)):
                    self.source_nodes.extend(tool_output.raw_output.source_nodes)

Notice it doesn't check for AsyncStreamingResponse

RakeshReddyKondeti commented 3 weeks ago

Hi @logan-markewich

Thanks! I changed the code as follows, and it is still not working.

class CustomStreamingAgentChatResponse(StreamingAgentChatResponse):
    def __post_init__(self) -> None:
        if self.sources and not self.source_nodes:
            for tool_output in self.sources:
                if isinstance(tool_output.raw_output, (Response, StreamingResponse, AsyncStreamingResponse)):
                    self.source_nodes.extend(tool_output.raw_output.source_nodes)

What is that I am still missing here ?

logan-markewich commented 3 weeks ago

That means that it's not calling a tool that returns those types? 🤷🏻