run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
33.26k stars 4.65k forks source link

[Bug]: Query Engine gives incomplete streaming response when using Gemini LLMs #13684

Open rasyosef opened 1 month ago

rasyosef commented 1 month ago

Bug Description

Query Engine gives incomplete streaming response when using Gemini LLMs. Whenever streaming is enabled, the first chunk of the output text is missing, but if streaming is disabled, the query engine returns a complete output.

Here are a few examples:

Query 1: What happened while Oppenheimer was a student at the University of Cambridge?

[Non-Streaming Response]: While studying at the University of Cambridge, Oppenheimer grappled with anxiety and homesickness. He left a poisoned apple for his supervisor, Patrick Blackett, but later retrieved it. Visiting scientist Niels Bohr recommended that Oppenheimer study theoretical physics at the University of Göttingen instead.

[Streaming Response]: . He left a poisoned apple for his supervisor, Patrick Blackett, but later retrieved it. Visiting scientist Niels Bohr recommended that Oppenheimer study theoretical physics at the University of Göttingen instead.

Query 2: What score did the Oppenheimer movie get on Rotten Tomatoes and Metacritic?

[Non-Streaming Response]: On Rotten Tomatoes, Oppenheimer received a score of 8.6/10 based on 495 reviews, with 93% of the reviews being positive. On Metacritic, the film received a score of 89 out of 100, based on 69 reviews, indicating "universal acclaim".

[Streaming Response]: based on 495 reviews, with 93% of the reviews being positive. On Metacritic, the film received a score of 89 out of 100, based on 69 reviews, indicating "universal acclaim".

Version

0.10.37

Steps to Reproduce

On Google Colab

Install Libraries

! pip install --upgrade --quiet google-generativeai llama-index llama-index-llms-gemini llama-index-embeddings-gemini

Download text document

! wget https://huggingface.co/spaces/rasyosef/RAG-with-Phi-2-and-LangChain/raw/main/Oppenheimer-movie-wiki.txt -P ./data

Create Query Engine

import os
from google.colab import userdata
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.embeddings.gemini import GeminiEmbedding
from llama_index.core import VectorStoreIndex
from llama_index.llms.gemini import Gemini

os.environ["GOOGLE_API_KEY"] = userdata.get("GEMINI_API_KEY")

reader = SimpleDirectoryReader(input_dir="./data")
documents = reader.load_data() # 1 document

node_parser = SentenceSplitter(chunk_size=512, chunk_overlap=32)
nodes = node_parser.get_nodes_from_documents(documents) # 25 nodes

embed_model = GeminiEmbedding(
    model="models/embedding-001", 
    title="Oppenheimer movie wikipedia", 
    embed_batch_size=16
)

index = VectorStoreIndex(nodes=nodes, embed_model=embed_model)

llm = Gemini(model_name="models/gemini-pro", temperature=0, max_tokens=256)

query_engine = index.as_query_engine(llm=llm, similarity_top_k=3)
query_engine_streaming = index.as_query_engine(llm=llm, streaming=True, similarity_top_k=3)

Query 1

QUERY_1 = "What happened while Oppenheimer was a student at the University of Cambridge?"

response = query_engine.query(QUERY_1)
print("[Non-Streaming Response]:")
print(response.response)

streaming_response = query_engine_streaming.query(QUERY_1)
print("\n[Streaming Response]:")
streaming_response.print_response_stream()

Output:

[Non-Streaming Response]:
While studying at the University of Cambridge, Oppenheimer grappled with anxiety and homesickness. He left a poisoned apple for his supervisor, Patrick Blackett, but later retrieved it. Visiting scientist Niels Bohr recommended that Oppenheimer study theoretical physics at the University of Göttingen instead.

[Streaming Response]:
. He left a poisoned apple for his supervisor, Patrick Blackett, but later retrieved it. Visiting scientist Niels Bohr recommended that Oppenheimer study theoretical physics at the University of Göttingen instead.

Query 2

QUERY_2 = "What score did the Oppenheimer movie get on Rotten Tomatoes and Metacritic?"

response = query_engine.query(QUERY_2)
print("[Non-Streaming Response]:")
print(response.response)

streaming_response = query_engine_streaming.query(QUERY_2)
print("\n[Streaming Response]:")
streaming_response.print_response_stream()

Output:

[Non-Streaming Response]:
On Rotten Tomatoes, Oppenheimer received a score of 8.6/10 based on 495 reviews, with 93% of the reviews being positive. On Metacritic, the film received a score of 89 out of 100, based on 69 reviews, indicating "universal acclaim".

[Streaming Response]:
 based on 495 reviews, with 93% of the reviews being positive. On Metacritic, the film received a score of 89 out of 100, based on 69 reviews, indicating "universal acclaim".

Relevant Logs/Tracbacks

No response

logan-markewich commented 1 month ago

This is the code for streaming with gemini

def stream_chat(
        self, messages: Sequence[ChatMessage], **kwargs: Any
    ) -> ChatResponseGen:
        merged_messages = merge_neighboring_same_role_messages(messages)
        *history, next_msg = map(chat_message_to_gemini, merged_messages)
        chat = self._model.start_chat(history=history)
        response = chat.send_message(next_msg, stream=True)

        def gen() -> ChatResponseGen:
            content = ""
            for r in response:
                top_candidate = r.candidates[0]
                content_delta = top_candidate.content.parts[0].text
                role = ROLES_FROM_GEMINI[top_candidate.content.role]
                raw = {
                    **(type(top_candidate).to_dict(top_candidate)),
                    **(
                        type(response.prompt_feedback).to_dict(response.prompt_feedback)
                    ),
                }
                content += content_delta
                yield ChatResponse(
                    message=ChatMessage(role=role, content=content),
                    delta=content_delta,
                    raw=raw,
                )

        return gen()

Maybe you can spot the issue, but I'm not sure if I see anything obviously wrong. Seems like its missing the first few chunks of text somehow

I wonder if you have the same if you run

from llama_index.core.llms import ChatMessage

llm = Gemini(...)

response = llm.stream_chat([ChatMessage(role="user", content="Tell me a poem about cats and dogs.")])

for r in response:
  print(r.delta, end="", flush=True)
rasyosef commented 1 month ago

Hey @logan-markewich , I think this issue seems to only occur when using the query engine.

When I passed a prompt template that is identical to the prompt used in the query engine directly to the llm, along with the sources and the query, I got a complete output without any chunks missing.

from llama_index.core.llms import ChatMessage, MessageRole

query = "What score did the Oppenheimer movie get on Rotten Tomatoes and Metacritic?"
formatted_sources = "\n\n\n".join([node.text for node in streaming_response.source_nodes])

response = llm.stream_chat([
    ChatMessage(role=MessageRole.SYSTEM, content=f"You are an expert Q&A system that is trusted around the world.\nAlways answer the query using the provided context information, and not prior knowledge.\nSome rules to follow:\n1. Never directly reference the given context in your answer.\n2. Avoid statements like 'Based on the context, ...' or 'The context information ...' or anything along those lines."), 
    ChatMessage(role=MessageRole.USER, content=f'Context information is below.\n---------------------\n{formatted_sources}\n---------------------\nGiven the context information and not prior knowledge, answer the query.\nQuery: {query}\nAnswer: ')
])

for r in response:
  print(r.delta, end="")

Output:

On Rotten Tomatoes, Oppenheimer received a score of 8.6/10 based on 495 reviews, with 93% of the reviews being positive. On Metacritic, the film received a score of 89 out of 100, based on 69 reviews, indicating "universal acclaim".
rasyosef commented 1 month ago

So the bug is probably in the query engine code. When streaming is enabled, the query engine somehow omits the first chunk from the streaming response. (Unlike GPT models, Gemini models stream multi-word chunks of text instead of individual tokens)

rasyosef commented 1 month ago

Changing the response_mode from the default compact to tree_summarize solved this issue, the bug could be in the response synthesizer

query_engine_streaming = index.as_query_engine(llm=llm, streaming=True, similarity_top_k=3, response_mode="tree_summarize")
logan-markewich commented 1 month ago

I'm pretty confused how this could be a query engine bug 🤔 sounds extremely sus

Lakeside90 commented 1 month ago

Indeed, by setting the response_mode to tree_summarize or simple_summarize, the problem was resolved. After trying edit the Refine class code and commenting out the dispatch_event related code, it ran normally. I don't know what caused this issue.


@dispatcher.span
    def get_response(
        self,
        query_str: str,
        text_chunks: Sequence[str],
        prev_response: Optional[RESPONSE_TEXT_TYPE] = None,
        **response_kwargs: Any,
    ) -> RESPONSE_TEXT_TYPE:
        """Give response over chunks."""
        # dispatch_event = dispatcher.get_dispatch_event()

        # dispatch_event(
        #     GetResponseStartEvent(query_str=query_str, text_chunks=text_chunks)
        # )
        response: Optional[RESPONSE_TEXT_TYPE] = None
        for text_chunk in text_chunks:
            if prev_response is None:
                # if this is the first chunk, and text chunk already
                # is an answer, then return it
                response = self._give_response_single(
                    query_str, text_chunk, **response_kwargs
                )
            else:
                # refine response if possible
                response = self._refine_response_single(
                    prev_response, query_str, text_chunk, **response_kwargs
                )
            prev_response = response
        if isinstance(response, str):
            if self._output_cls is not None:
                response = self._output_cls.parse_raw(response)
            else:
                response = response or "Empty Response"
        else:
            response = cast(Generator, response)
        # dispatch_event(GetResponseEndEvent(response=response))
        return response