run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
34.79k stars 4.91k forks source link

[Question]: Streamlitcallbackhandler #14664

Open KhyatiNinad opened 1 month ago

KhyatiNinad commented 1 month ago

Question Validation

Question

I want to integrate llamaindex with streamlit with streaming chat using Agents. I would want to understand how do I also display Thoughts or intermediate output of agents in UI with streamlit chat

logan-markewich commented 1 month ago

Thoughts/actions can be accessed if you execute an agent in stepwise mode https://colab.research.google.com/drive/1hiDkBbAJcO3RDrS7CD2ZeQEHGqNv07pq?usp=sharing

KhyatiNinad commented 1 month ago

@dosu let me clarify. Agents are running fine, giving their output in a streamlit app. But there are hierarchical agents running. So as intermediate steps, I want to show status in streamlit UI. Similar to how LangChain does with streamlitcallbackhandler. How to achieve that

dosubot[bot] commented 1 month ago

To integrate LlamaIndex with Streamlit and display the intermediate steps or status of hierarchical agents in the Streamlit UI, you can use a custom callback handler. This handler will capture the intermediate steps and update the Streamlit UI. Here is an example of how you can achieve this:

  1. Create a custom callback handler: This handler will capture the intermediate steps and update the Streamlit UI.

  2. Integrate the callback handler with the create_llama_agent function: Pass the custom callback handler to the create_llama_agent function.

  3. Update the Streamlit UI: Use Streamlit's st.write or st.text to display the intermediate steps.

Here is a sample implementation:

import streamlit as st
from llama_index.core.langchain_helpers.agents.agents import create_llama_agent
from llama_index.core.bridge.langchain import BaseLLM, BaseCallbackManager, AgentType
from llama_index.core.langchain_helpers.agents.toolkits import LlamaToolkit

class StreamlitCallbackHandler(BaseCallbackManager):
    def __init__(self):
        self.steps = []

    def on_event(self, event_type, payload):
        if event_type == "function_call":
            self.steps.append(f"Function Call: {payload['function_call']}")
        elif event_type == "function_output":
            self.steps.append(f"Function Output: {payload['function_output']}")
        self.update_ui()

    def update_ui(self):
        st.write("### Intermediate Steps")
        for step in self.steps:
            st.text(step)

# Initialize StreamlitCallbackHandler
callback_handler = StreamlitCallbackHandler()

# Create a LlamaToolkit instance (assuming you have a toolkit instance)
toolkit = LlamaToolkit()

# Initialize the LLM (replace with your LLM instance)
llm = BaseLLM()

# Create the agent with the custom callback manager
agent_executor = create_llama_agent(
    toolkit=toolkit,
    llm=llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    callback_manager=callback_handler
)

# Example task to run
task_input = "Your input here"

# Run the agent executor
output = agent_executor.run(task_input)

# Display the final output
st.write("### Final Output")
st.text(output)

In this example:

This approach allows you to visualize the intermediate steps of hierarchical agents in the Streamlit UI, similar to how StreamlitCallbackHandler works with LangChain [1][2].

To continue talking to Dosu, mention @dosu.

KhyatiNinad commented 1 month ago

@dosu Super. And how to implement the same with OpenAI agent with streaming as True

dosubot[bot] commented 1 month ago

To implement the same functionality with an OpenAI agent with streaming set to True, you can use the astream_chat method provided in the OpenAIAgentWorker class. This method handles streaming responses asynchronously. Here is how you can adapt the previous example:

  1. Create a custom callback handler: This handler will capture the intermediate steps and update the Streamlit UI.

  2. Integrate the callback handler with the OpenAIAgentWorker: Pass the custom callback handler to the OpenAIAgentWorker.

  3. Update the Streamlit UI: Use Streamlit's st.write or st.text to display the intermediate steps.

Here is a sample implementation:

import streamlit as st
import asyncio
from llama_index.core.langchain_helpers.agents.agents import create_llama_agent
from llama_index.core.bridge.langchain import BaseLLM, BaseCallbackManager, AgentType
from llama_index.core.langchain_helpers.agents.toolkits import LlamaToolkit
from llama_index.llms import OpenAI
from llama_index import ServiceContext, set_global_service_context

class StreamlitCallbackHandler(BaseCallbackManager):
    def __init__(self):
        self.steps = []

    def on_event(self, event_type, payload):
        if event_type == "function_call":
            self.steps.append(f"Function Call: {payload['function_call']}")
        elif event_type == "function_output":
            self.steps.append(f"Function Output: {payload['function_output']}")
        self.update_ui()

    def update_ui(self):
        st.write("### Intermediate Steps")
        for step in self.steps:
            st.text(step)

async def main():
    # Initialize StreamlitCallbackHandler
    callback_handler = StreamlitCallbackHandler()

    # Initialize the LLM with OpenAI and set streaming to True
    llm = OpenAI(model="gpt-3.5-turbo", temperature=0, streaming=True)
    ctx = ServiceContext.from_defaults(llm=llm)
    set_global_service_context(ctx)

    # Create a LlamaToolkit instance (assuming you have a toolkit instance)
    toolkit = LlamaToolkit()

    # Create the agent with the custom callback manager
    agent_executor = create_llama_agent(
        toolkit=toolkit,
        llm=llm,
        agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
        callback_manager=callback_handler
    )

    # Example task to run
    task_input = "Your input here"

    # Run the agent executor with streaming
    response = await agent_executor.astream_chat(task_input)

    # Display the final output
    st.write("### Final Output")
    async for token in response.response_gen:
        st.text(token)

# Run the main function
asyncio.run(main())

In this example:

This approach allows you to visualize the intermediate steps of hierarchical agents in the Streamlit UI with streaming enabled for the OpenAI agent [1][2][3].

To continue talking to Dosu, mention @dosu.

KhyatiNinad commented 1 month ago

@dosu thanks. What if I want to decouple Streamlitcallbackhandler class with streamlit st. How can I abstract this

dosubot[bot] commented 1 month ago

To decouple the StreamlitCallbackHandler class from Streamlit's st functions, you can abstract the Streamlit-specific functionality into a separate interface or class. This way, the callback handler can be used in different contexts without being tied to Streamlit.

Here is an example of how you can achieve this:

  1. Define an interface for the output handler.
  2. Implement the interface for Streamlit.
  3. Modify the StreamlitCallbackHandler to use the interface.
from queue import Queue
from threading import Event
from typing import Any, Generator, List, Optional
from uuid import UUID

from llama_index.core.bridge.langchain import BaseCallbackHandler, LLMResult

class OutputHandler:
    """Interface for output handling."""
    def write(self, message: str) -> None:
        raise NotImplementedError

class StreamlitOutputHandler(OutputHandler):
    """Streamlit implementation of the output handler."""
    def write(self, message: str) -> None:
        import streamlit as st
        st.write(message)

class StreamingGeneratorCallbackHandler(BaseCallbackHandler):
    """Streaming callback handler."""

    def __init__(self, output_handler: OutputHandler) -> None:
        self._token_queue: Queue = Queue()
        self._done = Event()
        self._output_handler = output_handler

    def __deepcopy__(self, memo: Any) -> "StreamingGeneratorCallbackHandler":
        # NOTE: hack to bypass deepcopy in langchain
        return self

    def on_llm_new_token(self, token: str, **kwargs: Any) -> Any:
        """Run on new LLM token. Only available when streaming is enabled."""
        self._token_queue.put_nowait(token)
        self._output_handler.write(token)

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        self._done.set()

    def on_llm_error(
        self,
        error: BaseException,
        *,
        run_id: UUID,
        parent_run_id: Optional[UUID] = None,
        tags: Optional[List[str]] = None,
        **kwargs: Any,
    ) -> None:
        self._done.set()

    def get_response_gen(self) -> Generator:
        while True:
            if not self._token_queue.empty():
                token = self._token_queue.get_nowait()
                yield token
            elif self._done.is_set():
                break

In this example:

This way, you can create different implementations of OutputHandler for different contexts, making the StreamingGeneratorCallbackHandler independent of Streamlit [1][2].

To continue talking to Dosu, mention @dosu.