run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
37.11k stars 5.33k forks source link

[Question]: I can't stream output asynchronously, bug a coroutine was expected #12949

Closed uniltone closed 4 months ago

uniltone commented 7 months ago

Question Validation

Question

import os
import time
import asyncio
from llama_index.core import Settings
from llama_index.legacy  import VectorStoreIndex, SimpleDirectoryReader, StorageContext, load_index_from_storage,Prompt,ServiceContext
#设置提示词
# from concurrent.futures import ThreadPoolExecutor
# import nest_asyncio
# nest_asyncio.apply()
DEFAULT_TEXT_QA_PROMPT_TMPL = (
    "Context information is below. \n"
    "---------------------\n"
    "{context_str}"
    "\n---------------------\n"
    "Given the context information and not prior knowledge, "
    "answer the question: {query_str}\n"
)
QA_PROMPT = Prompt(DEFAULT_TEXT_QA_PROMPT_TMPL)
#向量索引本地存储目录
PERSIST_DIR = os.path.abspath(os.path.dirname(os.path.abspath(__file__)))+".\storage"
PATH = os.path.abspath(os.path.dirname(os.path.abspath(__file__)))+'\data'
if not os.path.exists(PERSIST_DIR):
    service_content = ServiceContext.from_defaults(chunk_size=1024)
    documents = SimpleDirectoryReader(input_dir=PATH).load_data(show_progress=True)
    index = VectorStoreIndex.from_documents(documents,service_context=service_content)
    index.storage_context.persist(persist_dir=PERSIST_DIR)
else:
    storage_context = StorageContext.from_defaults(persist_dir=PERSIST_DIR)
    index = load_index_from_storage(storage_context)
async def chat(message,history=""):
    query_engine = index.as_chat_engine(chat_mode="context",system_prompt=QA_PROMPT,streaming=True,verse=True)
    response = await asyncio.gather(query_engine.astream_chat(message))
    async for token in response.async_response_gen():
        print(token,end="")
def main(message):
    asyncio.run(chat(message))
main("你好吗") 

Traceback (most recent call last):

File "e:\AiCompany\llama-index\gradio_demo2 astream_chat.py", line 46, in main("你好吗") ^^^^^^^^^^^^^^ File "e:\AiCompany\llama-index\gradio_demo2 astream_chat.py", line 45, in main asyncio.run(chat(message)) File "E:\anaconda\Lib\asyncio\runners.py", line 190, in run return runner.run(main) ^^^^^^^^^^^^^^^^ File "E:\anaconda\Lib\asyncio\runners.py", line 89, in run raise ValueError("a coroutine was expected, got {!r}".format(coro)) ValueError: a coroutine was expected, got <async_generator object chat at 0x000001F6B612AA40>

dosubot[bot] commented 7 months ago

The issue you're encountering is due to attempting to run an async generator with asyncio.run(), which expects a coroutine. To fix the error, you need to ensure that your chat function is a coroutine and properly awaits any asynchronous operations within it.

Here's a corrected version of your chat function that should work:

async def chat(message, history=""):
    query_engine = index.as_chat_engine(chat_mode="context", system_prompt=QA_PROMPT, streaming=True, verse=True)
    async for response in query_engine.astream_chat(message):
        print(response, end="")

This version uses async for to iterate over the responses from query_engine.astream_chat(message), which is the correct way to handle async generators.

Then, you can call this coroutine from your main function like so:

def main(message):
    asyncio.run(chat(message))

This approach ensures that chat is treated as a coroutine, which asyncio.run() can execute properly, resolving the "ValueError: a coroutine was expected" issue.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.
logan-markewich commented 7 months ago

@uniltone I think I would change how you get the response, the bot is actually correct here

query_engine = index.as_chat_engine(chat_mode="context",system_prompt=QA_PROMPT,streaming=True,verse=True)
response = await query_engine.astream_chat(message)
async for token in response.async_response_gen():
    print(token,end="")

Just remove the gather

uniltone commented 7 months ago
async def chat(message,history=""):
    query_engine = index.as_chat_engine(chat_mode="context",system_prompt=QA_PROMPT,streaming=True,verbose=True)
    response = await query_engine.astream_chat(message)
    async for token in response.async_response_gen():
        print(token,end="")
def main(message):
    asyncio.run(chat(message))
main("你好吗")

help!there are still bugs,and very slow bug: (base) PS E:\AiCompany> & E:/anaconda/python.exe "e:/AiCompany/llama-index/gradio_demo2 astream_chat.py" 我很好,谢谢!你呢?有什么新鲜事吗?Cancelling an overlapped future failed future: <_OverlappedFuture pending cb=[_ProactorReadPipeTransport._loop_reading()]> Traceback (most recent call last): File "E:\anaconda\Lib\asyncio\windows_events.py", line 71, in _cancel_overlapped self._ov.cancel() OSError: [WinError 6] 句柄无效。