run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
36.48k stars 5.21k forks source link

[Bug]: Streaming with async_response_gen incompatible with FastAPI #13495

Closed JLongley closed 5 months ago

JLongley commented 5 months ago

Bug Description

I have a very simple FastAPI endpoint set up to test out streaming tokens back from a context chat engine. As written, the first request correctly streams the content back, but every subsequent request gives me an asyncio error:

got Future <Future pending> attached to a different loop

The full stack trace is linked below.

Version

llama-index==0.10.36, fastapi==0.104.1

Steps to Reproduce

I'm running the above code in a docker container.

With that setup, I cURL http://localhost:8000/copilot/stream_test?message=Hello and get a streamed response. If I cURL the endpoint a second time, I get no response and the stack trace above is output by the server.

Here is my implementation:

@router.get("/stream_test")
async def stream_test(
    request: Request,
    message: str,
    chat_engine: BaseChatEngine = Depends(get_chat_engine),
):
    response = await chat_engine.astream_chat(message, [])

    # Generate a response.
    async def event_generator():
        async for token in response.async_response_gen():
            if await request.is_disconnected():
                break

            yield token

    return StreamingResponse(event_generator(), media_type="text/plain")
def get_chat_engine() -> BaseChatEngine:
    vector_store_index = VectorStoreIndex.from_documents(documents=[])

    chat_engine = vector_store_index.as_chat_engine(
        chat_mode="context",
        similarity_top_k=10,
        system_prompt="You are a helpful assistant",
    )

    return chat_engine

Relevant Logs/Tracbacks

Traceback (most recent call last):
   File "/usr/local/lib/python3.9/site-packages/uvicorn/protocols/http/h11_impl.py", line 429, in run_asgi
     result = await app(  # type: ignore[func-returns-value]
   File "/usr/local/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
     return await self.app(scope, receive, send)
   File "/usr/local/lib/python3.9/site-packages/fastapi/applications.py", line 1106, in __call__
     await super().__call__(scope, receive, send)
   File "/usr/local/lib/python3.9/site-packages/starlette/applications.py", line 122, in __call__
     await self.middleware_stack(scope, receive, send)
   File "/usr/local/lib/python3.9/site-packages/starlette/middleware/errors.py", line 184, in __call__
     raise exc
   File "/usr/local/lib/python3.9/site-packages/starlette/middleware/errors.py", line 162, in __call__
     await self.app(scope, receive, _send)
   File "/shared/common/web/backend/app/middleware/rate_limit.py", line 47, in __call__
     await self.app(scope, receive, send)
   File "/usr/local/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
     raise exc
   File "/usr/local/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
     await self.app(scope, receive, sender)
   File "/usr/local/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 20, in __call__
     raise e
   File "/usr/local/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 17, in __call__
     await self.app(scope, receive, send)
   File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 718, in __call__
     await route.handle(scope, receive, send)
   File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 276, in handle
     await self.app(scope, receive, send)
   File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 69, in app
     await response(scope, receive, send)
   File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 277, in __call__
     await wrap(partial(self.listen_for_disconnect, receive))
   File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
     raise exceptions[0]
   File "/usr/local/lib/python3.9/asyncio/tasks.py", line 256, in __step
     result = coro.send(None)
   File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 273, in wrap
     await func()
   File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 262, in stream_response
     async for chunk in self.body_iterator:
   File "/shared/common/web/backend/app/endpoints/copilot/endpoint.py", line 340, in event_generator
     async for token in response.async_response_gen():
   File "/usr/local/lib/python3.9/site-packages/llama_index/core/chat_engine/types.py", line 277, in async_response_gen
     delta = await asyncio.wait_for(self.aqueue.get(), timeout=0.1)
   File "/usr/local/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
     return fut.result()
   File "/usr/local/lib/python3.9/asyncio/futures.py", line 201, in result
     raise self._exception
   File "/usr/local/lib/python3.9/asyncio/tasks.py", line 258, in __step
     result = coro.throw(exc)
   File "/usr/local/lib/python3.9/asyncio/queues.py", line 166, in get
     await getter
   File "/usr/local/lib/python3.9/asyncio/futures.py", line 284, in __await__
     yield self  # This tells Task to wait for completion.
 RuntimeError: Task <Task pending name='Task-73' coro=<Queue.get() running at /usr/local/lib/python3.9/asyncio/queues.py:166> cb=[_release_waiter(<Future pendi...k.__wakeup()]>)() at /usr/local/lib/python3.9/asyncio/tasks.py:416]> got Future <Future pending> attached to a different loop
dosubot[bot] commented 5 months ago

The error you're encountering, got Future <Future pending> attached to a different loop, typically happens when an asyncio future or task is created in one event loop and then awaited in another. This is a common issue when dealing with asynchronous operations in FastAPI, especially when integrating with external libraries or managing background tasks.

To address this issue, ensure that all asynchronous components and tasks are initialized and awaited within the same event loop context. This can be achieved by:

Make sure to update your libraries to the latest versions, as fixes in newer versions might resolve your issue. Additionally, consider adding logging to track event loop usage throughout your application, which can help identify where the loop mismatch occurs.

Regarding the LlamaIndex library and its integration with FastAPI, ensure that any asynchronous operations are correctly managed within FastAPI's event loop. This includes proper initialization and usage of async components within your endpoint implementations.

If the issue persists, reviewing the implementation of the SimpleChatEngine and its asynchronous methods might be necessary. Ensure that these methods are correctly integrated into your FastAPI application, with careful management of asyncio tasks and event loops to avoid conflicts.

This approach should help resolve the asyncio error you're facing by ensuring consistent event loop usage across your FastAPI application and its asynchronous operations.

#### Details **Sources** - [llama-index-core/llama_index/core/chat_engine/types.py](https://github.com/run-llama/llama_index/blob/main/llama-index-core/llama_index/core/chat_engine/types.py) - [llama-index-core/llama_index/core/indices/common_tree/base.py](https://github.com/run-llama/llama_index/blob/main/llama-index-core/llama_index/core/indices/common_tree/base.py) - [llama-index-core/llama_index/core/chat_engine/context.py](https://github.com/run-llama/llama_index/blob/main/llama-index-core/llama_index/core/chat_engine/context.py) - [llama-index-legacy/llama_index/legacy/chat_engine/context.py](https://github.com/run-llama/llama_index/blob/main/llama-index-legacy/llama_index/legacy/chat_engine/context.py) - [llama-index-core/llama_index/core/chat_engine/simple.py](https://github.com/run-llama/llama_index/blob/main/llama-index-core/llama_index/core/chat_engine/simple.py) - [llama-index-legacy/llama_index/legacy/chat_engine/simple.py](https://github.com/run-llama/llama_index/blob/main/llama-index-legacy/llama_index/legacy/chat_engine/simple.py)
**About Dosu** This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a πŸ‘ if this is helpful and πŸ‘Ž if it is irrelevant.

To continue the conversation, mention @dosu.

logan-markewich commented 5 months ago

@JLongley can you try updating? v0.10.37 had some updates that maybe fixed this?

This script worked great for me on each request using the latest version of llama-index-core/llama-index

from llama_index.llms.openai import OpenAI
from llama_index.core import VectorStoreIndex, Document
from llama_index.core.chat_engine.types import BaseChatEngine

llm = OpenAI()

from fastapi import FastAPI, Request, Depends
from fastapi.responses import StreamingResponse

app = FastAPI()

def get_chat_engine() -> BaseChatEngine:
    vector_store_index = VectorStoreIndex.from_documents(documents=[])

    chat_engine = vector_store_index.as_chat_engine(
        chat_mode="context",
        similarity_top_k=10,
        system_prompt="You are a helpful assistant",
    )

    return chat_engine

@app.get("/")
async def root():
    return {"message": "Hello World"}

@app.get("/stream_test")
async def stream_test(
    request: Request,
    chat_engine: BaseChatEngine = Depends(get_chat_engine),
):
    response = await chat_engine.astream_chat("Tell me a poem about raining cats and dogs!")

    # Generate a response.
    async def event_generator():
        async for token in response.async_response_gen():
            if await request.is_disconnected():
                break

            yield token

    return StreamingResponse(event_generator(), media_type="text/plain")    

if __name__ == "__main__":
  import uvicorn
  uvicorn.run(app, loop="asyncio")
JLongley commented 5 months ago

Thanks Logan,

I've upgraded fastapi and llama-index both to the latest versions, but I still am seeing the same errors.

I notice that about 1 in every ~5 times the request will get through without an error, but interestingly, the response returns all at once in postman, not one token at a time.

llama-index==0.10.37 fastapi==0.111.0

from llama_index.llms.openai import OpenAI
from llama_index.core import VectorStoreIndex
from llama_index.core.chat_engine.types import BaseChatEngine

llm = OpenAI()

from fastapi import FastAPI, Request, Depends
from fastapi.responses import StreamingResponse

app = FastAPI()

import os

def get_chat_engine() -> BaseChatEngine:
    vector_store_index = VectorStoreIndex.from_documents(documents=[])

    chat_engine = vector_store_index.as_chat_engine(
        chat_mode="context",
        similarity_top_k=10,
        system_prompt="You are a helpful assistant",
    )

    return chat_engine

@app.get("/")
async def root():
    return {"message": "Hello World"}

@app.get("/stream_test")
async def stream_test(
    request: Request,
    chat_engine: BaseChatEngine = Depends(get_chat_engine),
):
    response = await chat_engine.astream_chat("Tell me a poem about raining cats and dogs!")

    # Generate a response.
    async def event_generator():
        async for token in response.async_response_gen():
            if await request.is_disconnected():
                break

            yield token

    # # Generate a response.
    # def event_generator():
    #     for token in response.response_gen:
    #         yield token

    return StreamingResponse(event_generator(), media_type="text/plain")

if __name__ == "__main__":
    import uvicorn

    uvicorn.run(
        app,
        loop="asyncio",
        host="0.0.0.0",
        port=8080,
    )

Error output:

ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 265, in __call__
    await wrap(partial(self.listen_for_disconnect, receive))
  File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 261, in wrap
    await func()
  File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 238, in listen_for_disconnect
    message = await receive()
  File "/usr/local/lib/python3.9/site-packages/uvicorn/protocols/http/httptools_impl.py", line 597, in receive
    await self.message_event.wait()
  File "/usr/local/lib/python3.9/asyncio/locks.py", line 226, in wait
    await fut
asyncio.exceptions.CancelledError: Cancelled by cancel scope ffff92492bb0
During handling of the above exception, another exception occurred:
  + Exception Group Traceback (most recent call last):
  |   File "/usr/local/lib/python3.9/site-packages/uvicorn/protocols/http/httptools_impl.py", line 436, in run_asgi
  |     result = await app(  # type: ignore[func-returns-value]
  |   File "/usr/local/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
  |     return await self.app(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/fastapi/applications.py", line 1054, in __call__
  |     await super().__call__(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/applications.py", line 123, in __call__
  |     await self.middleware_stack(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/middleware/errors.py", line 186, in __call__
  |     raise exc
  |   File "/usr/local/lib/python3.9/site-packages/starlette/middleware/errors.py", line 164, in __call__
  |     await self.app(scope, receive, _send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 65, in __call__
  |     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
  |     raise exc
  |   File "/usr/local/lib/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
  |     await app(scope, receive, sender)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 756, in __call__
  |     await self.middleware_stack(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 776, in app
  |     await route.handle(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 297, in handle
  |     await self.app(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 77, in app
  |     await wrap_app_handling_exceptions(app, request)(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
  |     raise exc
  |   File "/usr/local/lib/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
  |     await app(scope, receive, sender)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 75, in app
  |     await response(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 265, in __call__
  |     await wrap(partial(self.listen_for_disconnect, receive))
  |   File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 261, in wrap
    |     await func()
    |   File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 250, in stream_response
    |     async for chunk in self.body_iterator:
    |   File "/shared/common/main.py", line 41, in event_generator
    |     async for token in response.async_response_gen():
    |   File "/usr/local/lib/python3.9/site-packages/llama_index/core/chat_engine/types.py", line 277, in async_response_gen
    |     delta = await asyncio.wait_for(self.aqueue.get(), timeout=0.1)
    |   File "/usr/local/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
    |     return fut.result()
    |   File "/usr/local/lib/python3.9/asyncio/queues.py", line 166, in get
    |     await getter
    | RuntimeError: Task <Task pending name='Task-694' coro=<Queue.get() running at /usr/local/lib/python3.9/asyncio/queues.py:166> cb=[_release_waiter(<Future pendi...f9249ce20>()]>)() at /usr/local/lib/python3.9/asyncio/tasks.py:416]> got Future <Future pending> attached to a different loop
    +------------------------------------
logan-markewich commented 5 months ago

I was running in browser (lol), and the streaming seemed to work fine. Let me see if I can reproduce with the above code

logan-markewich commented 5 months ago

Seems like the queue maybe needs to be initialized each time to be in the current async loop?

logan-markewich commented 5 months ago

@JLongley Hmm, I still can't reproduce

image image

Maybe try with a fresh venv to be sure? I copied your script above exactly and just launched, and then used postman this time. Zero requests failed πŸ€”

JLongley commented 5 months ago

I tabled this task for a few weeks and after coming back to it, the streaming is now working. My best guess is that it was some dependency issue, as I've upgraded many of the dependencies in the meantime.

Apologies for the lack of real resolution if anybody encounters the same issue in the future.