run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
35.38k stars 4.98k forks source link

[Question]: Having trouble with async SubQuestionQueryEngine #15103

Open nazim-ashman-oc opened 1 month ago

nazim-ashman-oc commented 1 month ago

Question Validation

Question

I have been trying to get a SubQuestionQueryEngine working in async mode for a while in combination with FastAPI.

I have already applied: nest_asyncio.apply()

and am using the "asyncio" loop: uvicorn.run(app, host="0.0.0.0", port=8000, loop="asyncio")

I am setting up my sub_qestion_query_engine like this:

    recursive_retriever = RecursiveRetriever(
                                            "vector",
                                            retriever_dict=retriever_dict,
                                            verbose=True)
    response_synthesizer = get_response_synthesizer(response_mode="compact", 
                                                    streaming=True, 
                                                    service_context=ctx_base, 
                                                    use_async=True
                                                    )

    vector_query_engine = RetrieverQueryEngine.from_args(recursive_retriever, 
                                                         response_synthesizer=response_synthesizer, 
                                                         streaming=True,
                                                         use_async=True
                                                         )

    query_engine_tools = [
        QueryEngineTool(
            query_engine=vector_query_engine,
            metadata=ToolMetadata(
                name="vector_store_query_engine",
                description="Vector Store Query engine",
            ),
        ),
    ]

    query_engine = SubQuestionQueryEngine.from_defaults(
        query_engine_tools=query_engine_tools,
        use_async=True,
        service_context=ctx_base,
        response_synthesizer=response_synthesizer
    )

It is then put as a tool into a RouterQueryEngine. The RouterQueryEngine is then called from a FastAPI endpoint which in turn calls this function:

async def generate_chat_reply(request_query: str):
    engine_container = get_engine_container_singleton()
    router_query_engine = engine_container.router_query_engine
    response = await router_query_engine.aquery(request_query)
    response_gen = response.response_gen
    async for chunk in response_gen:
        yield chunk

Now I am getting this error:

    | Traceback (most recent call last):
    |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 234, in __step
    |     result = coro.throw(exc)
    |   File "/usr/local/lib/python3.10/site-packages/starlette/responses.py", line 261, in wrap
    |     await func()
    |   File "/usr/local/lib/python3.10/site-packages/starlette/responses.py", line 250, in stream_response
    |     async for chunk in self.body_iterator:
    |   File "/app/app/endpoints/chat.py", line 80, in generate_chat_reply
    |     response = await router_query_engine.aquery(request_query)
    |   File "/usr/local/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py", line 255, in async_wrapper
    |     result = await func(*args, **kwargs)
    |   File "/usr/local/lib/python3.10/site-packages/llama_index/core/base/base_query_engine.py", line 64, in aquery
    |     query_result = await self._aquery(str_or_query_bundle)
    |   File "/usr/local/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py", line 255, in async_wrapper
    |     result = await func(*args, **kwargs)
    |   File "/usr/local/lib/python3.10/site-packages/llama_index/core/query_engine/router_query_engine.py", line 246, in _aquery
    |     final_response = await selected_query_engine.aquery(query_bundle)
    |   File "/usr/local/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py", line 255, in async_wrapper
    |     result = await func(*args, **kwargs)
    |   File "/usr/local/lib/python3.10/site-packages/llama_index/core/base/base_query_engine.py", line 64, in aquery
    |     query_result = await self._aquery(str_or_query_bundle)
    |   File "/usr/local/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py", line 255, in async_wrapper
    |     result = await func(*args, **kwargs)
    |   File "/usr/local/lib/python3.10/site-packages/llama_index/core/query_engine/sub_question_query_engine.py", line 200, in _aquery
    |     qa_pairs_all = await asyncio.gather(*tasks)
    |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 304, in __wakeup
    |     future.result()
    |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 232, in __step
    |     result = coro.send(None)
    |   File "/usr/local/lib/python3.10/site-packages/llama_index/core/query_engine/sub_question_query_engine.py", line 240, in _aquery_subq
    |     response_text = str(response)
    |   File "/usr/local/lib/python3.10/site-packages/llama_index/core/base/response/schema.py", line 183, in __str__
    |     return asyncio_run(self._async_str)
    |   File "/usr/local/lib/python3.10/site-packages/llama_index/core/async_utils.py", line 47, in asyncio_run
    |     raise type(e)(f"Error running coroutine: {e!s}") from e
    | TypeError: Error running coroutine: An asyncio.Future, a coroutine or an awaitable is required

I am guessing this error hints at something wrong with the response sent back from the RetrieverQueryEngine layers, but I am not too sure, I have set use_async=True where I can and streaming=True. Not sure where I am going wrong.

dosubot[bot] commented 1 month ago

Hey @nazim-ashman-oc! 👋 I'm here to help you with any bugs, questions, or contributions you might have. Let's tackle this together while we wait for a human maintainer.

The error you're encountering is due to the __str__ method in the RESPONSE_TYPE class trying to run an asynchronous function synchronously. Specifically, the asyncio_run function is raising a TypeError because it expects an awaitable, but it seems to be getting something else.

To fix this, you need to ensure that the __str__ method is correctly handling the asynchronous function. Here is a potential fix:

  1. Modify the __str__ method in the RESPONSE_TYPE class to properly handle the asynchronous function.
  2. Ensure that the response_text is correctly awaited if it is a coroutine.

Here is an example of how you might modify the __str__ method:

from llama_index.core.async_utils import asyncio_run

class RESPONSE_TYPE:
    # Other methods and attributes...

    async def _async_str(self):
        # Your async string conversion logic here
        pass

    def __str__(self):
        return asyncio_run(self._async_str())

In your SubQuestionQueryEngine class, ensure that response_text is awaited if it is a coroutine:

async def _aquery_subq(
    self, sub_q: SubQuestion, color: Optional[str] = None
) -> Optional[SubQuestionAnswerPair]:
    try:
        with self.callback_manager.event(
            CBEventType.SUB_QUESTION,
            payload={EventPayload.SUB_QUESTION: SubQuestionAnswerPair(sub_q=sub_q)},
        ) as event:
            question = sub_q.sub_question
            query_engine = self._query_engines[sub_q.tool_name]

            if self._verbose:
                print_text(f"[{sub_q.tool_name}] Q: {question}\n", color=color)

            response = await query_engine.aquery(question)
            response_text = await response.__str__() if asyncio.iscoroutinefunction(response.__str__) else str(response)

            if self._verbose:
                print_text(f"[{sub_q.tool_name}] A: {response_text}\n", color=color)

            qa_pair = SubQuestionAnswerPair(
                sub_q=sub_q, answer=response_text, sources=response.source_nodes
            )

            event.on_end(payload={EventPayload.SUB_QUESTION: qa_pair})
            return qa_pair
    except ValueError:
        logger.warning(f"[{sub_q.tool_name}] Failed to run {question}")
        return None

This ensures that the response_text is correctly awaited if it is a coroutine, preventing the TypeError you encountered [1][2].

To continue talking to Dosu, mention @dosu.

logan-markewich commented 1 month ago

@nazim-ashman-oc you arent using an async generator

It should be async for chunk in response.async_response_gen():

nazim-ashman-oc commented 1 month ago

response.async_response_gen():

@logan-markewich Ah thanks. Made the changes, but still getting the same error :(

nazim-ashman-oc commented 1 month ago

Any other code snippets I could add to help out? I can't seem to find any examples online of usin the SubQuestionQuery Engine with async and streaming. async alone works and streaming alone works, but I can't get them to work together

arunnuve commented 1 month ago

Is the issue is in all query that you are giving or particular one query ,for me this issue also raised but for only particular queries except that other works well than that it works well.

nazim-ashman-oc commented 1 month ago

Is the issue is in all query that you are giving or particular one query ,for me this issue also raised but for only particular queries except that other works well than that it works well.

Well, from the RouterQueryEngine, if an engine other than SubQuestionQueryEngine is chosen, it works. It breaks down when SubQuestionQueryEngine is chosen.

nazim-ashman-oc commented 1 month ago

Ok, problem here was that I was using the same response synthesizer (set to streaming=True) for the RetrieverQueryEngine and the SubQuestionQueryEngine. Making a separate response synthesizer for RetrieverQueryEngine (with streaming=False) fixes this problem.

Practically this makes sense for me, since there is no point for RetrieverQueryEngine to be streaming since it is being fed into the SubQuestionQueryEngine. However, it still feels like unintended behaviour. The str() does not seem to work well with an AsyncStreamingResponse.

nazim-ashman-oc commented 1 month ago

@logan-markewich

Ok dosu pretty much pointed us in the right direction here, it looks like in

@dataclass
class AsyncStreamingResponse:
    """AsyncStreamingResponse object.

    Returned if streaming=True while using async.

    Attributes:
        _async_response_gen: The response async generator.

    """

    response_gen: TokenAsyncGen
    source_nodes: List[NodeWithScore] = field(default_factory=list)
    metadata: Optional[Dict[str, Any]] = None
    response_txt: Optional[str] = None

    def __post_init__(self) -> None:
        self._lock = asyncio.Lock()

    def __str__(self) -> str:
        """Convert to string representation."""
        return asyncio_run(self._async_str)

    async def _async_str(self) -> str:
        """Convert to string representation."""
        async for _ in self._yield_response():
            ...
        return self.response_txt or "None"

    async def _yield_response(self) -> TokenAsyncGen:
        """Yield the string response."""
        async with self._lock:
            if self.response_txt is None and self.response_gen is not None:
                self.response_txt = ""
                async for text in self.response_gen:
                    self.response_txt += text
                    yield text
            else:
                yield self.response_txt

.....

the str dunder method should be this instead (it was missing the brackets):

    def __str__(self) -> str:
        """Convert to string representation."""
        return asyncio_run(self._async_str())

I make a PR for this?

PR: https://github.com/run-llama/llama_index/pull/15131