run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
34.69k stars 4.9k forks source link

[Bug]: Streaming Response is not working when used with FastAPI (even with websockets) #10517

Open surya-ven opened 6 months ago

surya-ven commented 6 months ago

Bug Description

I've not been able to get streaming response generation to work properly with StreamingResponse in FastAPI.

The bug is weird in that, when I print the chunk of text within the query method below, I can see that it outputs text successively in the terminal - so it doesn't seem to be an issue with the response_gen / iterator, rather, it seems to be a problem with the StreamingResponse of FastAPI not playing well with using the llama-index response_gen iterator.

I'm using ChromaDB as a vector store. I've also tried using a simple POST REST API example without the use of websockets. Print_response_stream works as expected. I've tried outputting the chunks to the command line through using curl, by printing to the command line using a python script (httpx/asyncio), and also a simple index.html containing a script to output the chunks to a paragraph tag in case there was a problem with my command line configuration. Neither of these worked.

The text is always being printed at the very end, all at once (even if I print it on a new line).

Version

0.9.45.post1

Steps to Reproduce

Functions which produce the streaming output

Everything is setup/initialised prior to this print_response_stream() works as expected within the query method and I've also printed the output of each chunk within the iterator and each chunk is correctly printed successively. The problem therefore is likely in the FastAPI route.

Query method

def query(self, query_text):
        # Query the index
        streaming_response = self.query_engine.query(query_text)
        return streaming_response.response_gen

FastAPI route

@self.app.websocket("/ws/query")
        async def websocket_query(websocket: WebSocket):
            await self.connection_manager.connect(websocket)
            try:
                while True:
                    query_text = await websocket.receive_text()
                    streaming_response_gen = self.drive.query(query_text) # Your query engine here
                    for response in streaming_response_gen: # Assuming this is iterable
                        await self.connection_manager.send_personal_message(response, websocket)
            except WebSocketDisconnect:
                self.connection_manager.disconnect(websocket)
            except Exception as e:
                # Handle exceptions
                self.connection_manager.disconnect(websocket)
                print(f"Error: {str(e)}")

Relevant Logs/Tracbacks

Note that the 200 OK response is only received at the end of the text generation.

INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     ('127.0.0.1', 56750) - "WebSocket /ws/query" [accepted]
INFO:     connection open
2024-02-08 16:35:36 - HTTP Request: POST http://localhost:11434/api/chat "HTTP/1.1 200 OK"
HTTP Request: POST http://localhost:11434/api/chat "HTTP/1.1 200 OK"
dosubot[bot] commented 6 months ago

🤖

Hey @surya-ven, fancy seeing you here again! Running into new adventures, I see.

Based on the information you've provided, it seems like the issue might be related to how FastAPI's StreamingResponse is interacting with your server or client, rather than an issue with the LlamaIndex response_gen iterator itself.

FastAPI's StreamingResponse is designed to send data as soon as it's available. However, the behavior you're observing, where the text is printed all at once at the end, might be due to the server or client buffering the response.

If you're using Uvicorn as your server, it might be buffering the response and sending it all at once. You can try using the --no-send option with Uvicorn to disable this behavior:

uvicorn main:app --no-send

If you're using a different server, you should check its documentation for a similar option.

On the client side, if you're using curl to output the chunks to the command line, you can use the -N or --no-buffer option to disable buffering:

curl -N http://localhost:8000/stream

If you're using a web browser as your client, it might be waiting until the entire response is received before displaying it. Unfortunately, you might not be able to disable this behavior in a web browser. In this case, you could try using a different client that allows you to disable buffering, or you could try using a different method to consume the stream, like Server-Sent Events or WebSockets.

Regarding the structure and type of the response_gen object, it is of the type TokenGen as per the LlamaIndex codebase. However, without more information about the TokenGen type, it's difficult to provide more details about its structure.

Please note that these are general suggestions and might not solve your specific issue. If you're still having trouble, it would be helpful to have more information about your setup and the exact behavior you're seeing.

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

Pjamas180 commented 6 months ago

@dosu-bot it doesn't seem like --no-send is an option when running uvicorn from the code itself with uvicorn.run

theoratkin commented 6 months ago

I can confirm this, I experience this issue when using an async StreamingResponse. Switching it to synchronous makes streaming work as expected.

surya-ven commented 6 months ago

I can confirm this, I experience this issue when using an async StreamingResponse. Switching it to synchronous makes streaming work as expected.

Great, thanks for pointing this out, at least there's a temp workaround. Still can't get this to work with websockets however.

Tsovak commented 5 months ago

it is just not working without FastAPI dependency

chain = index.as_query_engine(
            chat_mode=ChatMode.BEST,
            memory=memory,
            streaming=True,
            vector_store_query_mode=VectorStoreQueryMode.DEFAULT,
            similarity_top_k=1,
            text_qa_template=text_qa_template,
            refine_template=refine_template,
        )

            streaming_response = chain.query(user_input)
            for chunk in streaming_response.response_gen:
                yield f"{chunk}"

so response_gen sometimes returns an empty response. but blocking response is working well

qkxie commented 3 months ago

Check if you use GzipMiddleware. If so, the streamingRespone will not work

PrestonBlackburn commented 2 months ago

I was having issues streaming responses using FastAPI websockets + LlamaIndex as well. I was able to get it working by mocking some of the StreamResponse functionality and finding a few issues with my async functions.

The response generator should just be a generator so you can mock the functionality of the StreamingResponse by doing something similar below. That way, you can test the generator part without needing to call other parts of LlamaIndex, and you can isolate any issues:

TokenGen = Generator[str, None, None]

@dataclass
class StreamingResponse:
    response_gen: TokenGen
    source_nodes: List = field(default_factory=list)
    metadata: Optional[Dict[str, Any]] = None
    response_txt: Optional[str] = None

def mock_stream() -> Generator:
    response = "Hello there! How can I assist you today?" * int(100)
    for word in response.split():
        yield word + " "

def rag_response_mock_generator(user_message:str) -> StreamingResponse:
    """
        This function is a mock of the llamaindex chat function
        It will return a mock stream of responses from the vdb.
    """
    stream_resp = StreamingResponse(
        response_gen=mock_stream(),
        source_nodes=[],
        metadata = {},
        response_txt="streaming response mock"
    )

    return stream_resp

To further isolate any issues, you can split out the query by using something like the base OpenAI API endpoint as well-


from openai import OpenAI
client = OpenAI()

def mock_open_ai_response_generator(user_message:str)-> Generator:

    stream_resp = StreamingResponse(
        response_gen=client.chat.completions.create(
    model='gpt-3.5-turbo',
    messages=[
        {'role': 'user', 'content': user_message}
    ],
    temperature=0,
    stream=True
    ),
        source_nodes=[],
        metadata = {},
        response_txt="streaming response mock"
    )

    return stream_resp

Also, if the use_async option in the index.as_query_engine function is set to True, it will return an AsyncGenerator instead, so you'd need to mock that instead.