run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
36.92k stars 5.29k forks source link

[Question]: Streaming Response printed all at once instead of Token by Token #17024

Open RakeshReddyKondeti opened 3 days ago

RakeshReddyKondeti commented 3 days ago

Question Validation

Question

Hi,

I attempted to reproduce the simple streaming response functionality using workflows as shown below:


from textwrap import dedent
from llama_index.core.workflow import (
    Workflow,
    Event,
    StartEvent,
    StopEvent,
    Context,
    step,
)
from llama_index.core.prompts import PromptTemplate
from llama_index.llms.llama_cpp import LlamaCPP
import asyncio

class TokenEvent(Event):
    token: str

class SimpleEssayWorkflow(Workflow):
    def __init__(
        self,
        timeout: int = 360,
    ):
        super().__init__(timeout=timeout)
        self.llm = LlamaCPP(
            model_path="<path_to_Llama3.1_model>",
            temperature=0,
            model_kwargs={"n_gpu_layers": -1
                        },
            verbose=True
        )

    @step
    async def write_essay(self, ev: StartEvent, ctx: Context) -> StopEvent:
        topic = ev.topic
        word_limit = ev.word_limit
        prompt = PromptTemplate(
            dedent(
                """
                Write an essay on the topic: "{topic}" under {word_limit} words.
                Essay: 
                """
            )
        )
        response_generator = await self.llm.astream(
            prompt=prompt,
            topic=topic,
            word_limit=word_limit,
        )

        async for token in response_generator:
            ctx.write_event_to_stream(TokenEvent(token=token))

        return StopEvent(result="Done")

async def main():
    Workflow = SimpleEssayWorkflow(
        timeout=60.0
    )

    handler = Workflow.run(topic="The importance of education", word_limit=100)

    await handler

    async for event in handler.stream_events():
        if isinstance(event, TokenEvent):
            print(event.token, end="", flush=True)

if __name__ == "__main__":
    asyncio.run(main())

However, instead of printing the response token by token, the entire response is being printed to the console at once. Could you provide some insight into why this might be happening? Thanks!

dosubot[bot] commented 3 days ago

It seems like you're encountering an issue where the streaming response is printed all at once instead of token by token. This might be happening because you are using the chat endpoint instead of the stream_chat endpoint. To enable streaming and print the response token by token, you should call the stream_chat endpoint and iterate over streaming_response.response_gen as shown in the example below:

chat_engine = index.as_chat_engine()
streaming_response = chat_engine.stream_chat("Tell me a joke.")
for token in streaming_response.response_gen:
    print(token, end="")

Ensure that you are using the stream_chat method and iterating over the response_gen to achieve token-by-token streaming [1].

To continue talking to Dosu, mention @dosu.

RakeshReddyKondeti commented 3 days ago

Hi @dosu,

I am not using any chat_engine in this context. My intention is to test it out with an LLM.

dosubot[bot] commented 3 days ago

To achieve token-by-token streaming with an LLM without using a chat engine, you can use the stream_complete method provided by the LlamaIndex framework. Here's how you can implement it:

# Streaming completion token-by-token
streamed_response = await self.llm.astream_complete(
    prompt=prompt,
    topic=topic,
    word_limit=word_limit,
)

async for response_delta in streamed_response:
    ctx.write_event_to_stream(TokenEvent(token=response_delta.delta))

Make sure you are using the astream_complete method for asynchronous streaming. This method will yield tokens one by one, allowing you to process each token as it is generated. Adjust your workflow to use this method instead of astream to achieve the desired token-by-token streaming behavior.

To continue talking to Dosu, mention @dosu.

RakeshReddyKondeti commented 3 days ago

@dosu it's still the same

dosubot[bot] commented 3 days ago

If you're still experiencing the issue where the response is printed all at once instead of token by token, there might be a few areas to check:

  1. Ensure Streaming is Enabled: Verify that the stream option is set to True in the generate_kwargs when using the LlamaCPP model. This is crucial for enabling token-by-token streaming.

  2. Check the Event Loop: Make sure that the asyncio event loop is running correctly and that the asynchronous tasks are being awaited properly. Any blocking operations in the loop could cause the tokens to be buffered and printed all at once.

  3. Inspect the write_event_to_stream Method: Ensure that the write_event_to_stream method is correctly placing events into the queue without any delays or buffering issues. This method should be adding each token to the queue as soon as it is received.

  4. Debugging: Add logging or print statements within the async for loop to see if tokens are being received one by one. This can help determine if the issue is with the token generation or the printing process.

  5. Review the LLM Configuration: Double-check the configuration of the LlamaCPP model to ensure all settings are correct for streaming. This includes verifying the model path and any other parameters that might affect performance.

If these steps don't resolve the issue, there might be a deeper configuration or implementation detail that needs to be addressed.

To continue talking to Dosu, mention @dosu.

logan-markewich commented 3 days ago

@RakeshReddyKondeti Two issues

  1. You call await handler before iterating over the stream. await handler waits until the entire workflow is done, whereas if you use stream_events() first, it'll stream as the workflow runs

  2. LlamaCPP runs locally, in process. Because of this, there is no true async, its blocking the event loop. I'm unsure if this will actually let you stream properly with how this works in async + python. I tested with ollama and openai after moving the await handler though and it worked fine

Corrected code:

async def main():
    Workflow = SimpleEssayWorkflow(
        timeout=60.0
    )

    handler = Workflow.run(topic="The importance of education", word_limit=100)

    async for event in handler.stream_events():
        if isinstance(event, TokenEvent):
            print(event.token, end="", flush=True)

    await handler