run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
36.75k stars 5.27k forks source link

[Question]: Question on Enabling Streaming Mode for `ReActAgent` in LlamaIndex #16815

Closed Xphi310302 closed 1 week ago

Xphi310302 commented 1 week ago

Question Validation

Question

Hello, I’m currently working with the ReAct agent from llama_index.core.agent, but I’ve found it challenging to adapt it to my specific use case. To increase flexibility, I switched to the ReActAgent written in the Workflow module, which I found easier to customize.

There’s a guide on using ReActAgent in Workflow that provides an example setup. However, I’m struggling to enable streaming mode for the final response of ReActAgent. In the example, all steps, including reasoning and the final output step, use the achat method to output responses. My goal is to use astream_chat specifically for the final response step so that I can stream the output to the UI in real time.

Is there a way to set this up?

Additional Context

I want to:

  1. Use achat for all intermediate steps to keep reasoning concise and synchronous.
  2. Enable astream_chat only for the final step to allow for streaming and real-time updates in the UI.

Thank you for any suggestions or solutions!

dosubot[bot] commented 1 week ago

Hey @Xphi310302! I'm here to help you with any bugs, questions, or contributions you have in mind. Let's work together to solve this puzzle!

To enable streaming mode for the final response of ReActAgent using astream_chat while keeping all intermediate steps using achat, you can use the MockStreamChatLLM for the final response and MockChatLLM for intermediate steps. Here's how you can achieve this:

  1. Use achat for intermediate steps.
  2. Use astream_chat for the final response.

Here's a code snippet demonstrating this setup:

import asyncio
from llama_index.legacy.agent.react.base import ReActAgent
from llama_index.legacy.agent.react.types import ObservationReasoningStep
from llama_index.legacy.agent.types import Task
from llama_index.legacy.bridge.pydantic import PrivateAttr
from llama_index.legacy.chat_engine.types import (
    AgentChatResponse,
    StreamingAgentChatResponse,
)
from llama_index.legacy.core.llms.types import (
    ChatMessage,
    ChatResponse,
    ChatResponseGen,
    MessageRole,
)
from llama_index.legacy.llms.mock import MockLLM, MockStreamChatLLM
from llama_index.legacy.tools.function_tool import FunctionTool

# Define the add tool
def add(a: int, b: int) -> int:
    """Add two integers and returns the result integer."""
    return a + b

add_tool = FunctionTool.from_defaults(fn=add)

# Define mock responses
MOCK_ACTION_RESPONSE = """\
Thought: I need to use a tool to help me answer the question.
Action: add
Action Input: {"a": 1, "b": 1}
"""

MOCK_STREAM_FINAL_RESPONSE = """\
Thought: I have enough information to answer the question without using any more tools.
Answer: 2 is the final answer.
"""

# Create mock LLMs
mock_llm = MockChatLLM(
    responses=[
        ChatMessage(
            content=MOCK_ACTION_RESPONSE,
            role=MessageRole.ASSISTANT,
        ),
    ]
)

mock_stream_llm = MockStreamChatLLM(
    responses=[
        ChatMessage(
            content=MOCK_STREAM_FINAL_RESPONSE,
            role=MessageRole.ASSISTANT,
        ),
    ]
)

# Create the agent
agent = ReActAgent.from_tools(
    tools=[add_tool],
    llm=mock_llm,
)

# Run intermediate steps using achat
async def run_intermediate_steps():
    response = await agent.achat("What is 1 + 1?")
    assert isinstance(response, AgentChatResponse)
    assert response.response == "2"

# Run final step using astream_chat
async def run_final_step():
    agent.llm = mock_stream_llm  # Switch to streaming LLM for final response
    response = await agent.astream_chat("What is 1 + 1?")
    assert isinstance(response, StreamingAgentChatResponse)

    # exhaust stream
    async for delta in response.async_response_gen():
        continue
    expected_answer = MOCK_STREAM_FINAL_RESPONSE.split("Answer: ")[-1].strip()
    assert response.response == expected_answer

# Execute the steps
async def main():
    await run_intermediate_steps()
    await run_final_step()

asyncio.run(main())

This code sets up the ReActAgent to use achat for intermediate steps and astream_chat for the final response, ensuring that the final response is streamed while intermediate steps are handled normally [1].

To continue talking to Dosu, mention @dosu.

Xphi310302 commented 1 week ago

@dosu Here is the code snippet for ReActAgent written in Workflow. I want to adjust the agent’s output so that:

This approach would let me stream the final answer to the UI, enhancing real-time user interaction, while keeping intermediate reasoning concise.

from typing import Any, List

from llama_index.core.agent.react import ReActChatFormatter, ReActOutputParser
from llama_index.core.agent.react.types import (
    ActionReasoningStep,
    ObservationReasoningStep,
)
from llama_index.core.llms.llm import LLM
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.tools.types import BaseTool
from llama_index.core.workflow import (
    Context,
    Workflow,
    StartEvent,
    StopEvent,
    step,
)
from llama_index.llms.openai import OpenAI

class ReActAgent(Workflow):
    def __init__(
        self,
        *args: Any,
        llm: LLM | None = None,
        tools: list[BaseTool] | None = None,
        extra_context: str | None = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(*args, **kwargs)
        self.tools = tools or []

        self.llm = llm or OpenAI()

        self.memory = ChatMemoryBuffer.from_defaults(llm=llm)
        self.formatter = ReActChatFormatter(context=extra_context or "")
        self.output_parser = ReActOutputParser()
        self.sources = []

    @step
    async def new_user_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent:
        # clear sources
        self.sources = []

        # get user input
        user_input = ev.input
        user_msg = ChatMessage(role="user", content=user_input)
        self.memory.put(user_msg)

        # clear current reasoning
        await ctx.set("current_reasoning", [])

        return PrepEvent()

    @step
    async def prepare_chat_history(
        self, ctx: Context, ev: PrepEvent
    ) -> InputEvent:
        # get chat history
        chat_history = self.memory.get()
        current_reasoning = await ctx.get("current_reasoning", default=[])
        llm_input = self.formatter.format(
            self.tools, chat_history, current_reasoning=current_reasoning
        )
        return InputEvent(input=llm_input)

    @step
    async def handle_llm_input(
        self, ctx: Context, ev: InputEvent
    ) -> ToolCallEvent | StopEvent:
        chat_history = ev.input

        response = await self.llm.achat(chat_history)

        try:
            reasoning_step = self.output_parser.parse(response.message.content)
            (await ctx.get("current_reasoning", default=[])).append(
                reasoning_step
            )
            if reasoning_step.is_done:
                self.memory.put(
                    ChatMessage(
                        role="assistant", content=reasoning_step.response
                    )
                )
                return StopEvent(
                    result={
                        "response": reasoning_step.response,
                        "sources": [*self.sources],
                        "reasoning": await ctx.get(
                            "current_reasoning", default=[]
                        ),
                    }
                )
            elif isinstance(reasoning_step, ActionReasoningStep):
                tool_name = reasoning_step.action
                tool_args = reasoning_step.action_input
                return ToolCallEvent(
                    tool_calls=[
                        ToolSelection(
                            tool_id="fake",
                            tool_name=tool_name,
                            tool_kwargs=tool_args,
                        )
                    ]
                )
        except Exception as e:
            (await ctx.get("current_reasoning", default=[])).append(
                ObservationReasoningStep(
                    observation=f"There was an error in parsing my reasoning: {e}"
                )
            )

        # if no tool calls or final response, iterate again
        return PrepEvent()

    @step
    async def handle_tool_calls(
        self, ctx: Context, ev: ToolCallEvent
    ) -> PrepEvent:
        tool_calls = ev.tool_calls
        tools_by_name = {tool.metadata.get_name(): tool for tool in self.tools}

        # call tools -- safely!
        for tool_call in tool_calls:
            tool = tools_by_name.get(tool_call.tool_name)
            if not tool:
                (await ctx.get("current_reasoning", default=[])).append(
                    ObservationReasoningStep(
                        observation=f"Tool {tool_call.tool_name} does not exist"
                    )
                )
                continue

            try:
                tool_output = tool(**tool_call.tool_kwargs)
                self.sources.append(tool_output)
                (await ctx.get("current_reasoning", default=[])).append(
                    ObservationReasoningStep(observation=tool_output.content)
                )
            except Exception as e:
                (await ctx.get("current_reasoning", default=[])).append(
                    ObservationReasoningStep(
                        observation=f"Error calling tool {tool.metadata.get_name()}: {e}"
                    )
                )

        # prep the next iteraiton
        return PrepEvent()
dosubot[bot] commented 1 week ago

To adjust the ReActAgent in your Workflow to use astream_chat for the final output and achat for intermediate steps, you can modify the handle_llm_input method to switch between these two methods based on whether the reasoning step is final or not. Here's how you can implement this:

  1. Use achat for Intermediate Steps: Continue using achat for processing intermediate reasoning steps to keep them concise and synchronous.

  2. Use astream_chat for the Final Output: When a final response is detected, switch to using astream_chat to stream the response to the UI.

Here's a modified version of your handle_llm_input method to achieve this:

@step
async def handle_llm_input(
    self, ctx: Context, ev: InputEvent
) -> ToolCallEvent | StopEvent:
    chat_history = ev.input

    # Use achat for intermediate steps
    response = await self.llm.achat(chat_history)

    try:
        reasoning_step = self.output_parser.parse(response.message.content)
        (await ctx.get("current_reasoning", default=[])).append(reasoning_step)

        if reasoning_step.is_done:
            # Switch to astream_chat for the final response
            final_response = await self.llm.astream_chat(chat_history)
            async for delta in final_response.async_response_gen():
                # Stream delta to UI or process as needed
                pass

            self.memory.put(
                ChatMessage(
                    role="assistant", content=final_response.response
                )
            )
            return StopEvent(
                result={
                    "response": final_response.response,
                    "sources": [*self.sources],
                    "reasoning": await ctx.get("current_reasoning", default=[]),
                }
            )
        elif isinstance(reasoning_step, ActionReasoningStep):
            tool_name = reasoning_step.action
            tool_args = reasoning_step.action_input
            return ToolCallEvent(
                tool_calls=[
                    ToolSelection(
                        tool_id="fake",
                        tool_name=tool_name,
                        tool_kwargs=tool_args,
                    )
                ]
            )
    except Exception as e:
        (await ctx.get("current_reasoning", default=[])).append(
            ObservationReasoningStep(
                observation=f"There was an error in parsing my reasoning: {e}"
            )
        )

    # if no tool calls or final response, iterate again
    return PrepEvent()

This setup allows you to handle intermediate reasoning with achat and stream the final answer using astream_chat, enhancing real-time user interaction while keeping intermediate steps concise.

To continue talking to Dosu, mention @dosu.

logan-markewich commented 1 week ago

@Xphi310302 Streaming with react is pretty hard, especially if you only want to stream the final step.

You can use llm.astream_chat() ctx.write_event_to_stream() to expose streaming events that contain the streaming llm response. But if you only want to stream the final step, you need check for Answer: in the stream, which indicates when the final response is starting

...
response_gen = await llm.astream_chat(chat_history)

response_str = ""
found_answer = False
async for resp in response_gen:
  response_str += resp.delta or ""
  if found_answer:
    ctx.write_event_to_stream(MyStreamingEvent(delta=resp.delta))

  if `Answer:` in response_str:
   found_answer = True
...

Then when you call the workflow

handler = workflow.run(...)
async for ev in handler.stream_events:
  if isinstance(ev, MyStreamingEvent):
    print(ev.delta, end="", flush=True)

result = await handler
phinx-orient commented 1 week ago

@logan-markewich Thank you! I successfully implemented streaming for the final response. May I ask another question? I noticed that the code from the guide does not include an input slot for a system prompt in the ReActAgent. Could you advise on how I might modify the code to allow the ReActAgent using Workflow to accept a system prompt as input?

logan-markewich commented 1 week ago

@phinx-orient the system prompt is already there, you can inspect the chat_history being used in the llm call

The formatter inserts the system prompt which has details on how the react loop should work.

If you want to add additional context/instructions to this system prompt, you can set the extra_context variable, which injects extra context into the existing system prompt

ReActChatFormatter(context=extra_context or "")