deepset-ai / haystack

AI orchestration framework to build customizable, production-ready LLM applications. Connect components (models, vector DBs, file converters) to pipelines or agents that can interact with your data. With advanced retrieval methods, it's best suited for building RAG, question answering, semantic search or conversational agent chatbots.
https://haystack.deepset.ai
Apache License 2.0
17.83k stars 1.93k forks source link

Streamlit Stream Callback in Haystack 2.x #6970

Open ilkersigirci opened 9 months ago

ilkersigirci commented 9 months ago

Describe the solution you'd like I would like to see streaming response in real time on streamlit when using OpenAIChatGenerator

Describe alternatives you've considered I have tried to the following streaming_callback function. (I have only put related code fields. I can show full code if requested)

import streamlit as st

# Method 1: Nothing seen in the UI
def st_streaming_callback(chunk: StreamingChunk):
    yield chunk.content

# Method 2: Instead of typewriter format, each chunk seen as one line output.
def st_streaming_callback(chunk: StreamingChunk):
    st.write(chunk.content)

model = OpenAIChatGenerator(
    model="gpt-4-turbo-preview",
    streaming_callback=streaming_callback,
    generation_kwargs=None,
)

response_dict = st.write_stream(
  llm_chat.run(
      messages=[
          ChatMessage(
              role=message.role, content=message.content, name=None
          )
          for message in st.session_state.messages
      ]
 )

Additional context Add any other context or screenshots about the feature request here.

anakin87 commented 9 months ago

This is an interesting use case but needs some investigation.

I think we should provide a guide (+ code examples) on how to handle streaming. An example about streamlit would be helpful.

ilkersigirci commented 9 months ago

This is an interesting use case but needs some investigation.

I think we should provide a guide (+ code examples) on how to handle streaming. An example about streamlit would be helpful.

That would be perfect, thanks

T-Visor commented 4 months ago

Hello! I was battling with this same issue.

Below is a personal example, I'm using OllamaChatGenerator but the same principles can be applied to OpenAI's chat generator.

Happy to help with documentation or anything else!

Class file using OllamaChatGenerator

import streamlit as st
from haystack import Pipeline
from haystack_integrations.components.generators.ollama import OllamaChatGenerator
from haystack.dataclasses import StreamingChunk, ChatMessage

class ConversationalChatbot:
    """
        A conversational chatbot which will stream responses to the Streamlit
        UI.
    """

    def __init__(self):
        """
            Initializes the chatbot with a language model and a default system message.
        """
        self.large_language_model = OllamaChatGenerator(
            model='orca-mini',
            url='http://localhost:11434/api/chat',
            streaming_callback=self.streamlit_write_streaming_chunk
        )

        self.messages = [ChatMessage.from_system("\nYou are a helpful, respectful and honest assistant")]

    def query(self, query: str) -> str:
        """
            Run a query and return the response from the language model.

        Args:
            query (str): The user's query string.

        Returns:
            str: The assistant's response.
        """
        # Create a new Streamlit container for the AI's response.
        self.placeholder = st.empty()

        # Initialize an empty list for response tokens.
        self.tokens = []

        # Add the user's query to the chat history.
        self.messages.append(ChatMessage.from_user(query))

        # Send the chat history to the language model and get the response.
        response = self.large_language_model.run(self.messages)

        # Check if the response contains valid replies.
        if 'replies' in response:
            response_content = response['replies'][0].content
            # Add the assistant's response to the chat history.
            self.messages.append(ChatMessage.from_assistant(response_content))
            return response_content
        else:
            raise Exception('No valid response or unexpected response format.')

    def streamlit_write_streaming_chunk(self, chunk: StreamingChunk):
        """
            Streams a response chunk to the Streamlit UI.

        Args:
            chunk (StreamingChunk): The streaming chunk from the language model.
        """
        # Append the latest streaming chunk to the tokens list.
        self.tokens.append(chunk.content)

        # Update the Streamlit container with the current stream of tokens.
        self.placeholder.write("".join(self.tokens))

    def add_message_to_chat_history(self, message: ChatMessage):
        """
            Add a message to the chat history.

        Args:
            message (ChatMessage): The message to add to the chat history.
        """
        self.messages.append(message)

This is the streamlit 'app' file


import streamlit as st
from haystack import Pipeline
from haystack.dataclasses import ChatMessage
from conversational_pipeline import ConversationalChatbot

# Constants to store key names in the config dictionary
TITLE_NAME = 'title_name'
UI_RENDERED_MESSAGES = 'ui_rendered_messages'
CHAT_HISTORY = 'chat_history'
CONVERSATIONAL_PIPELINE = 'conversational_pipeline'

def main():
    """
        Render the retrieval augmented generation (RAG) chatbot application.
    """
    config = load_config()
    initialize_session_state(config)
    setup_page()
    render_chat_history()
    manage_chat()

def load_config():
    """
        Load the application configuration from a file or object.

    Returns:
        dict: Configuration dictionary containing title name,
              UI rendered messages, chat history, and conversational pipeline instance.
    """
    return {
        TITLE_NAME: 'Haystack Streaming Example',
        UI_RENDERED_MESSAGES: [],
        CHAT_HISTORY: [],
        CONVERSATIONAL_PIPELINE: ConversationalChatbot()
    }

def setup_page():
    """
        Set Streamlit page configuration and title.
    """
    st.set_page_config(page_title=st.session_state[TITLE_NAME])
    st.title(st.session_state[TITLE_NAME])

def initialize_session_state(config):
    """
        Initialize Streamlit session state variables using the provided configuration.

    Args:
        config (dict): Configuration dictionary.
    """
    for key, value in config.items():
        if key not in st.session_state:
            st.session_state[key] = value

def manage_chat():
    """
        Handle user interaction with the conversational AI and render
        the user query along with the AI response.
    """
    if prompt := st.chat_input('What can we help you with?'):
        # Render user message.
        with st.chat_message('user'):
            st.markdown(prompt)
        st.session_state[UI_RENDERED_MESSAGES].append({'role': 'user', 'content': prompt})

        # Render AI assistant's response.
        with st.chat_message('assistant'):
            with st.spinner('Generating response . . .'):
                response = st.session_state[CONVERSATIONAL_PIPELINE].query(prompt)
        st.session_state[UI_RENDERED_MESSAGES].append({'role': 'assistant', 'content': response})

def render_chat_history():
    """
        Display the chat message history stored in session state.
    """
    for message in st.session_state[UI_RENDERED_MESSAGES]:
        with st.chat_message(message['role']):
            st.markdown(message['content'])

if __name__ == '__main__':
    main()

https://github.com/user-attachments/assets/bdad16a6-a7e9-4568-8fdf-a47254774044

ilkersigirci commented 4 months ago

@T-Visor thank you for the code. It works perfectly on my end. I hope it can be natively integrated in haystack

T-Visor commented 4 months ago

@ilkersigirci You're welcome! It's a hack but glad it worked. Hoping the same.

RazaGR commented 1 month ago

This should also work with Litestar please

@julian-risch @bilgeyucel @TuanaCelik