run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
33.75k stars 4.75k forks source link

[Question]: Query Pipeline as Streaming Agent #14613

Open KhyatiNinad opened 1 week ago

KhyatiNinad commented 1 week ago

Question Validation

Question

I am trying to add a Query Pipeline as an Agent. It will be used as a tool to another OpenAI agent. It works fine but when I want to enable streaming, it errors out saying the Agent does not support streaming. Can you please point out to the right way to achieve this.

dosubot[bot] commented 1 week ago

Hey there, @KhyatiNinad! I'm a bot here to help you out with any bugs, questions, or contributions you have. Let's tackle this challenge together!

To enable streaming for a Query Pipeline used as an Agent for another OpenAI agent without encountering errors, you need to configure the query engine to use streaming by setting streaming=True when building the query engine. Here is how you can do it:

  1. Using the high-level API:

    query_engine = index.as_query_engine(streaming=True, similarity_top_k=1)
  2. Using the low-level API:

    from llama_index.core import get_response_synthesizer
    
    synth = get_response_synthesizer(streaming=True, ...)
    query_engine = RetrieverQueryEngine(response_synthesizer=synth, ...)

After configuring the query engine, calling query will return a StreamingResponse object, which you can then process as the response is being generated.

Example of obtaining a Generator from the streaming response and iterating over the tokens:

streaming_response = query_engine.query("What did the author do growing up?")

for text in streaming_response.response_gen:
    # do something with text as they arrive.
    pass

Alternatively, to print the text as it arrives:

streaming_response.print_response_stream()

The BaseOpenAIAgent class in the llama_index.legacy.agent.legacy.openai_agent module supports streaming directly. The _get_stream_ai_response and _get_async_stream_ai_response methods handle streaming responses. Here is an example of how you can use these methods:

class MyOpenAIAgent(BaseOpenAIAgent):
    def get_tools(self, message: str) -> List[BaseTool]:
        # Implement your tool retrieval logic here
        return []

# Initialize your agent
agent = MyOpenAIAgent(llm=OpenAI(), memory=ChatMemoryBuffer(), prefix_messages=[], verbose=True, max_function_calls=5, callback_manager=None)

# Get a streaming response
streaming_response = agent._get_stream_ai_response(messages=[{"role": "user", "content": "What did the author do growing up?"}])

for text in streaming_response.chat_stream:
    print(text)

Additionally, if you are using the RagCLI class from the llama_index.legacy.command_line.rag module, you can ensure that the QueryPipeline is set up with streaming enabled by default. Here is an example of how the RagCLI class configures the QueryPipeline:

class RagCLI(BaseModel):
    # ... other fields and methods ...

    @validator("query_pipeline", always=True)
    def query_pipeline_from_ingestion_pipeline(
        cls, query_pipeline: Any, values: Dict[str, Any]
    ) -> Optional[QueryPipeline]:
        if query_pipeline is not None:
            return query_pipeline

        ingestion_pipeline = cast(IngestionPipeline, values["ingestion_pipeline"])
        if ingestion_pipeline.vector_store is None:
            return None
        verbose = cast(bool, values["verbose"])
        query_component = FnComponent(
            fn=query_input, output_key="output", req_params={"query_str"}
        )
        llm = cast(LLM, values["llm"])

        embed_model = None
        if ingestion_pipeline.transformations is not None:
            for transformation in ingestion_pipeline.transformations:
                if isinstance(transformation, BaseEmbedding):
                    embed_model = transformation
                    break

        service_context = ServiceContext.from_defaults(
            llm=llm, embed_model=embed_model or "default"
        )
        retriever = VectorStoreIndex.from_vector_store(
            ingestion_pipeline.vector_store, service_context=service_context
        ).as_retriever(similarity_top_k=8)
        response_synthesizer = CompactAndRefine(
            service_context=service_context, streaming=True, verbose=verbose
        )

        query_pipeline = QueryPipeline(verbose=verbose)
        query_pipeline.add_modules(
            {
                "query": query_component,
                "retriever": retriever,
                "summarizer": response_synthesizer,
            }
        )
        query_pipeline.add_link("query", "retriever")
        query_pipeline.add_link("retriever", "summarizer", dest_key="nodes")
        query_pipeline.add_link("query", "summarizer", dest_key="query_str")
        return query_pipeline

This approach leverages the built-in streaming capabilities of the BaseOpenAIAgent class and the RagCLI class, ensuring that you can handle streaming responses without encountering errors [1][2][3][4][5].

To continue talking to Dosu, mention @dosu.

KhyatiNinad commented 1 week ago

@dosu No. I want to use QueryPipelineAgentWorker or FnAgentWorker to stream my response which uses OpenAI as underlying LLM. Please guide.

dosubot[bot] commented 1 week ago

To stream responses using QueryPipelineAgentWorker or FnAgentWorker with OpenAI as the underlying LLM, you need to implement the stream_step and astream_step methods, as they currently raise NotImplementedError. Here is an example of how you might implement these methods:

from llama_index.legacy.agent.types import TaskStepOutput

class QueryPipelineAgentWorker(BaseModel, BaseAgentWorker):
    # ... other methods ...

    @trace_method("run_step")
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step (stream)."""
        # Implement streaming logic here
        agent_response, is_done = self.pipeline.run(state=step.step_state, task=task, stream=True)
        response = self._get_task_step_response(agent_response, step, is_done)
        task.extra_state.update(step.step_state)
        return response

    @trace_method("run_step")
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async stream)."""
        # Implement async streaming logic here
        agent_response, is_done = await self.pipeline.arun(state=step.step_state, task=task, stream=True)
        response = self._get_task_step_response(agent_response, step, is_done)
        task.extra_state.update(step.step_state)
        return response

In this implementation, you would need to ensure that the pipeline.run and pipeline.arun methods support a stream parameter and handle streaming appropriately. This might involve modifying the QueryPipeline class to support streaming if it does not already.

Additionally, you can configure the query engine to use streaming by setting streaming=True when building the query engine. Here is how you can do it using the high-level API:

query_engine = index.as_query_engine(streaming=True, similarity_top_k=1)

Or using the low-level API:

from llama_index.core import get_response_synthesizer

synth = get_response_synthesizer(streaming=True, ...)
query_engine = RetrieverQueryEngine(response_synthesizer=synth, ...)

After configuring both the LLM and the query engine, calling query will return a StreamingResponse object:

streaming_response = query_engine.query("What did the author do growing up?")

You can then obtain a Generator from the streaming response and iterate over the tokens as they arrive:

for text in streaming_response.response_gen:
    # do something with text as they arrive.
    pass

Alternatively, if you just want to print the text as they arrive:

streaming_response.print_response_stream()

For the FnAgentWorker, you would need to override the stream_step and astream_step methods similarly. Here is an example:

class FnAgentWorker(BaseModel, BaseAgentWorker):
    # ... other methods ...

    @trace_method("run_step")
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """Run step (stream)."""
        # Implement streaming logic here
        agent_response, is_done = self._run_step(step.step_state, task, input=step.input)
        response = self._get_task_step_response(agent_response, step, is_done)
        task.extra_state.update(step.step_state)
        return response

    @trace_method("run_step")
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """Run step (async stream)."""
        # Implement async streaming logic here
        agent_response, is_done = await self._arun_step(step.step_state, task, input=step.input)
        response = self._get_task_step_response(agent_response, step, is_done)
        task.extra_state.update(step.step_state)
        return response

Ensure that the fn and async_fn functions used in FnAgentWorker are capable of handling streaming responses [1][2][3][4][5].

To continue talking to Dosu, mention @dosu.

KhyatiNinad commented 1 week ago

@dosu thanks. This worked. But if I add this agent as tool for another OpenAI agent, it throws pydantic validation error for AgentChatWithStepEndEvent Please guide how to modify so that it can work as a tool for another OpenAI agent

dosubot[bot] commented 1 week ago

To modify the implementation so that it can work as a tool for another OpenAI agent without causing validation errors, you need to ensure that the necessary classes and functions are correctly imported and exposed. Based on the provided content, the __init__.py file already imports and exposes the relevant components.

Here is the content of the __init__.py file:

from llama_index.agent.openai.base import OpenAIAgent
from llama_index.agent.openai.openai_assistant_agent import OpenAIAssistantAgent
from llama_index.agent.openai.step import OpenAIAgentWorker, advanced_tool_call_parser

__all__ = [
    "OpenAIAgent",
    "OpenAIAgentWorker",
    "OpenAIAssistantAgent",
    "advanced_tool_call_parser",
]

Additionally, you can define a custom tool call parser to handle malformed function calls, which is a common issue with older versions of the OpenAI API. Here is an example of how to define and use a custom tool call parser:

from typing import Dict
from llama_index.llms.openai.utils import OpenAIToolCall
import re
import json

def custom_tool_call_parser(tool_call: OpenAIToolCall) -> Dict:
    r"""Parse tool calls that are not standard json.
    Also parses tool calls of the following forms:
    variable = \"\"\"Some long text\"\"\"
    variable = "Some long text"'
    variable = '''Some long text'''
    variable = 'Some long text'
    """
    arguments_str = tool_call.function.arguments
    if len(arguments_str.strip()) == 0:
        # OpenAI returns an empty string for functions containing no args
        return {}
    try:
        tool_call = json.loads(arguments_str)
        if not isinstance(tool_call, dict):
            raise ValueError("Tool call must be a dictionary")
        return tool_call
    except json.JSONDecodeError as e:
        # pattern to match variable names and content within quotes
        pattern = r'([a-zA-Z_][a-zA-Z_0-9]*)\s*=\s*["\']+(.*?)["\']+'
        match = re.search(pattern, arguments_str)

        if match:
            variable_name = match.group(1)  # This is the variable name
            content = match.group(2)  # This is the content within the quotes
            return {variable_name: content}
        raise ValueError(f"Invalid tool call: {e!s}")

To use this custom tool call parser with your OpenAI agent, you can set it up as follows:

from llama_index.agent.openai import OpenAIAgent
from llama_index.llms.openai import OpenAI
from llama_index.core.tools import FunctionTool

# Define your tools
def multiply(a: int, b: int) -> int:
    return a * b

multiply_tool = FunctionTool.from_defaults(fn=multiply)

def add(a: int, b: int) -> int:
    return a + b

add_tool = FunctionTool.from_defaults(fn=add)

# Initialize the OpenAI agent with the custom tool call parser
llm = OpenAI(model="gpt-3.5-turbo-0613")
agent = OpenAIAgent.from_tools(
    [multiply_tool, add_tool],
    llm=llm,
    verbose=True,
    tool_call_parser=custom_tool_call_parser,
)

# Example usage
response = agent.chat("What is (121 * 3) + 42?")
print(str(response))

This setup should be sufficient for the module to be used as a tool for another OpenAI agent, provided that the other agent correctly imports and utilizes these components. If you encounter validation errors, ensure that the other agent's implementation correctly references these classes and functions.

Additionally, you can refer to the LlamaIndex documentation for more examples and detailed guides on setting up agents and tools:

from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
from llama_index.core.agent import ReActAgent

# define sample Tool
def multiply(a: int, b: int) -> int:
    """Multiply two integers and returns the result integer"""
    return a * b

multiply_tool = FunctionTool.from_defaults(fn=multiply)

# initialize llm
llm = OpenAI(model="gpt-3.5-turbo-0613")

# initialize ReAct agent
agent = ReActAgent.from_tools([multiply_tool], llm=llm, verbose=True)

For more information, you can explore the Agent Module Guide and other resources provided in the LlamaIndex documentation [1][2][3][4].

To continue talking to Dosu, mention @dosu.

KhyatiNinad commented 1 week ago

@dosu No. The error occurs in AgentWorker class. It seems that it is expecting text or StreamingAgentChatResponse. So, ideally it should return a StreamingAgentChatResponse when it is top most Agent or when return_direct = True. And it should return the text response when added as a tool for another Agent. How to achieve this?

dosubot[bot] commented 1 week ago

To modify the AgentWorker class implementation to return a StreamingAgentChatResponse when it is the top-most agent or when return_direct = True, and return the text response when it is used as a tool for another agent, you can update the _run_step method to include this logic. Here is how you can do it:

  1. Add a parameter return_direct to the _run_step method.
  2. Check the return_direct flag and the agent's position to determine the type of response to return.

Here is the updated implementation:

from llama_index.legacy.chat_engine.types import StreamingAgentChatResponse

class CustomSimpleAgentWorker(BaseModel, BaseAgentWorker):
    # ... (other parts of the class)

    @abstractmethod
    def _run_step(
        self, state: Dict[str, Any], task: Task, input: Optional[str] = None, return_direct: bool = False
    ) -> Tuple[AGENT_CHAT_RESPONSE_TYPE, bool]:
        """Run step.

        Returns:
            Tuple of (agent_response, is_done)
        """
        # Your existing logic to generate agent_response and is_done
        agent_response, is_done = ...  # Implement your logic here

        # Determine the type of response to return
        if return_direct or self.is_top_most_agent():
            return StreamingAgentChatResponse(response=agent_response), is_done
        else:
            return agent_response.text, is_done

    def is_top_most_agent(self) -> bool:
        # Implement logic to determine if this is the top-most agent
        # This is a placeholder implementation
        return True  # Replace with actual logic

    # ... (other parts of the class)

In this implementation:

You will need to implement the actual logic for is_top_most_agent based on your application's requirements. The StreamingAgentChatResponse class is imported from llama_index.legacy.chat_engine.types [1][2][3][4].

To continue talking to Dosu, mention @dosu.