elastic / elasticsearch-py

Official Python client for Elasticsearch
https://ela.st/es-python
Apache License 2.0
4.21k stars 1.18k forks source link

[Bug]: ElasticSearch : Timeout context manager should be used inside a task #2614

Open FlorentGrenier opened 1 month ago

FlorentGrenier commented 1 month ago

Bug Description

I'm developing a chatbot, and on a second request sent the bug appears

I opened an issue on the github of the llama_index library, but apparently the bug comes more from the elastic search library.

I opened a post on elastic search forum

2024-06-27_10-54-35

Version

elasticsearch : 8.14.0 elastic-transport : 8.13.1

Steps to Reproduce

With llama_index, send a second query to the RetrieverQueryEngine, built from a VectorIndexRetriever, a VectorStoreIndex and an ElasticSearchVectorStore.

Relevant Logs/Tracbacks

Traceback (most recent call last):
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\streamlit\runtime\scriptrunner\script_runner.py", line 589, in _run_script
    exec(code, module.__dict__)
  File "C:\data\git\.......\streamlit_app.py", line 52, in <module>
    response = response_generator.chat(user_query=prompt)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\data\git\.......\src\components\response_synthesis.py", line 84, in chat
    content = self.build_context_prompt(self.retriever(user_query=user_query))
                                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\data\git\.......\src\components\response_synthesis.py", line 61, in retriever
    retrieved_nodes = retriever.retrieve(user_query)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 230, in wrapper
    result = func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\base\base_retriever.py", line 243, in retrieve
    nodes = self._retrieve(query_bundle)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 230, in wrapper
    result = func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 101, in _retrieve
    return self._get_nodes_with_embeddings(query_bundle)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 177, in _get_nodes_with_embeddings
    query_result = self._vector_store.query(query, **self._kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\vector_stores\elasticsearch\base.py", line 412, in query
    return asyncio.get_event_loop().run_until_complete(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 98, in run_until_complete
    return f.result()
           ^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\asyncio\futures.py", line 203, in result     
    raise self._exception.with_traceback(self._exception_tb)
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\asyncio\tasks.py", line 314, in __step_run_and_handle_result
    result = coro.send(None)
             ^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\vector_stores\elasticsearch\base.py", line 452, in aquery
    hits = await self._store.search(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\helpers\vectorstore\_async\vectorstore.py", line 277, in search
    response = await self.client.search(
               ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\__init__.py", line 4121, in search
    return await self.perform_request(  # type: ignore[return-value]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\_base.py", line 271, in perform_request
    response = await self._perform_request(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\_base.py", line 316, in _perform_request
    meta, resp_body = await self.transport.perform_request(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elastic_transport\_async_transport.py", line 264, in perform_request
    resp = await node.perform_request(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elastic_transport\_node\_http_aiohttp.py", line 179, in perform_request
    async with self.session.request(
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\client.py", line 1197, in __aenter__
    self._resp = await self._coro
                 ^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\client.py", line 507, in _request
    with timer:
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\helpers.py", line 715, in __enter__
    raise RuntimeError(
RuntimeError: Timeout context manager should be used inside a task
ZiTao-Li commented 1 month ago

Is there any solution or conclusion here? I run into the same problem when using llama-index

picografix commented 1 month ago

Ran into same issue while using llama-index

picografix commented 1 month ago

@FlorentGrenier try running the application using uvicorn/gunicorn rather than python app.py, it gives the error but works

FlorentGrenier commented 1 month ago

@carlyrichmond any solution ?

carlyrichmond commented 1 month ago

Hi @FlorentGrenier,

I'm not a Python expert, so I'll defer to our engineers on this one. I do see one commenter suggesting this is an issue with the elasticsearch-py library. However, looking at the issue you raised on the LlamaIndex repo, there are some suggestions shared for you to check in this comment. Can you confirm those have been actioned?

miguelgrinberg commented 1 month ago

@FlorentGrenier Hi! I'm not super familiar with the LlamaIndex integration for Elasticsearch, but the issue that you have occurs because objects associated with two different async loops are getting mixed up.

The reason a second loop is being used is that you are invoking a synchronous retriever interface. That forces LlamaIndex to internally create a second async loop to run the Elasticsearch code on. Once again I apologize about not being super familiar with LlamaIndex, but it does look like most query functions have an async version, which usually starts with "a", such as aretrieve(), aquery() and so on. If you were to use async versions of the Elasticsearch integration functions I think it would eliminate the need to create a second loop. If possible, give that a try and report back if that helps or we need to keep looking.

FlorentGrenier commented 1 month ago

Hi @FlorentGrenier,

I'm not a Python expert, so I'll defer to our engineers on this one. I do see one commenter suggesting this is an issue with the elasticsearch-py library. However, looking at the issue you raised on the LlamaIndex repo, there are some suggestions shared for you to check in this comment. Can you confirm those have been actioned?

Hi @carlyrichmond, Yes I actioned the solution, but it didn't solve my problem

@FlorentGrenier Hi! I'm not super familiar with the LlamaIndex integration for Elasticsearch, but the issue that you have occurs because objects associated with two different async loops are getting mixed up.

The reason a second loop is being used is that you are invoking a synchronous retriever interface. That forces LlamaIndex to internally create a second async loop to run the Elasticsearch code on. Once again I apologize about not being super familiar with LlamaIndex, but it does look like most query functions have an async version, which usually starts with "a", such as aretrieve(), aquery() and so on. If you were to use async versions of the Elasticsearch integration functions I think it would eliminate the need to create a second loop. If possible, give that a try and report back if that helps or we need to keep looking.

Hi @miguelgrinberg, LlamaIndex does indeed offer asynchronous query functions, and following a solution provided in the LlamaIndex issue, I'm using the achat() method, but unfortunately this doesn't solve the problem.

Here's some more code, for a better context :

Streamlit part

if st.session_state.messages[-1]["role"] == "user":
    with st.chat_message("assistant"):
        with st.spinner("Je recherche dans mes documents..."):
            response =  asyncio.run(response_generator.chat_async(user_query=prompt, stream=False))
            response_chat = st.write(response.response)
            source_badges = generate_source_badges(Utils.get_sources(response.source_nodes))
            st.markdown(source_badges)
            st.session_state.messages.append({"role": "assistant", "content": response_chat, "source": source_badges})

Response synthesis part


def load_es_index(self) :
        async_client = AsyncElasticsearch(
            hosts=[Configs.elastic_url],
            basic_auth=(Configs.elastic_username, Configs.elastic_password)
            )

        vector_store = ElasticsearchStore(
                                index_name=Configs.index_name,
                                es_url=Configs.elastic_url,
                                es_client=async_client
                            )

        index = VectorStoreIndex.from_vector_store(vector_store=vector_store,embed_model=Settings.embed_model)
        self.index = index

def chat_async(self, user_query, similarity_top_k, stream ):
        memory = ChatMemoryBuffer.from_defaults(token_limit=3000)

        system_mesage = [
            ChatMessage(role=MessageRole.SYSTEM, content= Utils.DEFAULT_SYSTEM_PROMPT + Utils.DEFAULT_PREFIX_PROMPT)
        ] 

        chat_engine = self.index.as_chat_engine(
            chat_mode="context", 
            memory=memory,
            llm=Settings.llm,
            system_prompt = system_mesage,
            context_template=Utils.DEFAULT_CONTEXT_PROMPT,
            verbose=True
        )
        # achat() from llama-index
        response = chat_engine.achat(user_query)
        return response
miguelgrinberg commented 1 month ago

@FlorentGrenier your chat_async() function should be defined as async def. And the call to chat_engine.achat() should be preceded with await. You will likely have additional changes to make as a result of changing the chat_async() function from sync to async.

FlorentGrenier commented 1 month ago

@miguelgrinberg I've already tried this but nothing changes

miguelgrinberg commented 1 month ago

@FlorentGrenier Please provide an updated stack trace if you are still getting errors.

FlorentGrenier commented 1 month ago

@miguelgrinberg Here :

Traceback (most recent call last):
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\streamlit\runtime\scriptrunner\script_runner.py", line 589, in _run_script
    exec(code, module.__dict__)
  File "C:\data\git\***\streamlit_app.py", line 66, in <module>
    response =  asyncio.run(response_generator.chat_async(user_query=prompt, stream=False))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 30, in run  
    return loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 98, in run_until_complete
    return f.result()
           ^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\asyncio\futures.py", line 203, in result
    raise self._exception.with_traceback(self._exception_tb)
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\asyncio\tasks.py", line 314, in __step_run_and_handle_result
    result = coro.send(None)
             ^^^^^^^^^^^^^^^
  File "C:\data\git\***\src\components\response_synthesis.py", line 177, in chat_async
    response = await chat_engine.achat(user_query)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\callbacks\utils.py", line 56, in async_wrapper
    return await func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\chat_engine\context.py", line 236, in achat
    context_str_template, nodes = await self._agenerate_context(message)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\chat_engine\context.py", line 124, in _agenerate_context
    nodes = await self._retriever.aretrieve(message)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\base\base_retriever.py", line 274, in aretrieve
    nodes = await self._aretrieve(query_bundle=query_bundle)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 112, in _aretrieve
    return await self._aget_nodes_with_embeddings(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 184, in _aget_nodes_with_embeddings
    query_result = await self._vector_store.aquery(query, **self._kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\vector_stores\elasticsearch\base.py", line 452, in aquery
    hits = await self._store.search(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\helpers\vectorstore\_async\vectorstore.py", line 277, in search
    response = await self.client.search(
               ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\__init__.py", line 4121, in search
    return await self.perform_request(  # type: ignore[return-value]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\_base.py", line 271, in perform_request
    response = await self._perform_request(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\_base.py", line 316, in _perform_request
    meta, resp_body = await self.transport.perform_request(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elastic_transport\_async_transport.py", line 264, in perform_request
    resp = await node.perform_request(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elastic_transport\_node\_http_aiohttp.py", line 179, in perform_request
    async with self.session.request(
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\client.py", line 1197, in __aenter__
    self._resp = await self._coro
                 ^^^^^^^^^^^^^^^^
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\client.py", line 507, in _request
    with timer:
  File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\helpers.py", line 715, in __enter__
    raise RuntimeError(
RuntimeError: Timeout context manager should be used inside a task
miguelgrinberg commented 1 month ago

The streamlit app is already asynchronous. With the changes you've made, you just moved the place in which the 2nd loop is created higher in the stack trace. You are now doing it yourself in this line:

response =  asyncio.run(response_generator.chat_async(user_query=prompt, stream=False))

What you need to do to avoid this issue is to use a fully async application. That likely means converting the function that has the line of code above to async, so that you can just use await response_generator.chat_async(...). You may also need to convert to async other functions that call this one, all the way up the call stack until you reach streamlit. What you want is to use the streamlit async loop, not your own secondary one.

FlorentGrenier commented 1 month ago

I try this (complete code of my streamlit app)

import streamlit as st
#sys.path.append('C:/data/git/***/src')
from src.components.response_synthesis import ResponseSynthesis
from src.utils.utils import Utils
import time
from streamlit_pills import pills
import asyncio
import nest_asyncio
nest_asyncio.apply()

st.set_page_config(page_title="AthenAI",  page_icon='🤖', layout="centered", initial_sidebar_state="auto", menu_items=None)

@st.cache_resource(show_spinner=False)
def load_data():
    with st.spinner(text="Chargement des documents. Cela devrait prendre 1 à 2 minutes."):
        response_generator = ResponseSynthesis(local_index=False)
        return response_generator

def generate_source_badges(sources):
    badges = ""
    for file_name, source_info in sources.items():
        badges += f" **Fichier**: `{file_name}` | **Pages**: `{source_info['page']}` | **Dernière modification**: `{source_info['last_modified']}`  \n"
    return badges

def response_generator_stream(response_chat):
    for word in response_chat.split():
        yield word + " "
        time.sleep(0.05)

async def main():
    response_generator = load_data()

    st.title("AthenAI 💬")

    with st.expander("Information"):
        st.info("Version de démonstration", icon="ℹ️")
        st.markdown('''
                    Sa base de connaissance est constitué de 4 documents :
                    - Un document sur le contrat d'apprentissage
                    - Un guide CFA
                    - Un document sur ParcourSup
                    - Un document sur la signature electronique
                ''')

    if "messages" not in st.session_state.keys():
        st.session_state.messages = [
            {"role": "assistant", "content": "Posez moi une question sur la documentation de Val!"}
        ]

    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.write(message["content"])
            if "source" in message.keys():
                st.markdown(message["source"])

    if prompt := st.chat_input("Votre question"):
        st.session_state.messages.append({"role": "user", "content": prompt})
        with st.chat_message("user"):
            st.markdown(prompt)

    if st.session_state.messages[-1]["role"] == "user":
        with st.chat_message("assistant"):
            with st.spinner("Je recherche dans mes documents..."):
                response = await response_generator.chat_async(user_query=prompt, stream=False)

                response_chat = response.response
                st.write(response_chat)

                source_badges = generate_source_badges(Utils.get_sources(response.source_nodes))
                st.markdown(source_badges)

                st.session_state.messages.append({"role": "assistant", "content": response_chat, "source": source_badges})

if __name__ == '__main__':
    asyncio.run(main())

Trace :

RuntimeError: Timeout context manager should be used inside a task
Traceback:
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\streamlit\runtime\scriptrunner\script_runner.py", line 589, in _run_script
    exec(code, module.__dict__)
File "C:\data\git\***\streamlit_app.py", line 78, in <module>
    asyncio.run(main())
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 30, in run
    return loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 98, in run_until_complete
    return f.result()
           ^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\asyncio\futures.py", line 203, in result
    raise self._exception.with_traceback(self._exception_tb)
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\asyncio\tasks.py", line 314, in __step_run_and_handle_result
    result = coro.send(None)
             ^^^^^^^^^^^^^^^
File "C:\data\git\***\streamlit_app.py", line 66, in main
    response = await response_generator.chat_async(user_query=prompt, stream=False)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\data\git\***\src\components\response_synthesis.py", line 177, in chat_async
    response = await chat_engine.achat(user_query)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\callbacks\utils.py", line 56, in async_wrapper
    return await func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\chat_engine\context.py", line 236, in achat
    context_str_template, nodes = await self._agenerate_context(message)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\chat_engine\context.py", line 124, in _agenerate_context
    nodes = await self._retriever.aretrieve(message)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\base\base_retriever.py", line 274, in aretrieve
    nodes = await self._aretrieve(query_bundle=query_bundle)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 112, in _aretrieve
    return await self._aget_nodes_with_embeddings(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 184, in _aget_nodes_with_embeddings
    query_result = await self._vector_store.aquery(query, **self._kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\vector_stores\elasticsearch\base.py", line 452, in aquery
    hits = await self._store.search(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\helpers\vectorstore\_async\vectorstore.py", line 277, in search
    response = await self.client.search(
               ^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\__init__.py", line 4121, in search
    return await self.perform_request(  # type: ignore[return-value]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\_base.py", line 271, in perform_request
    response = await self._perform_request(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\_base.py", line 316, in _perform_request
    meta, resp_body = await self.transport.perform_request(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elastic_transport\_async_transport.py", line 264, in perform_request
    resp = await node.perform_request(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elastic_transport\_node\_http_aiohttp.py", line 179, in perform_request
    async with self.session.request(
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\client.py", line 1197, in __aenter__
    self._resp = await self._coro
                 ^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\client.py", line 507, in _request
    with timer:
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\helpers.py", line 715, in __enter__
    raise RuntimeError(
miguelgrinberg commented 1 month ago

@FlorentGrenier remove all references to nest_asyncio from your application and try again.

FlorentGrenier commented 1 month ago

Done, but still the same error with the same trace... 😅

miguelgrinberg commented 1 month ago

It can't be the same stack trace if you removed nest_asyncio... Maybe similar, but not identical. Can you please share it? I don't have a test application to use to debug this, so I need to see the stack trace.

FlorentGrenier commented 1 month ago
Traceback (most recent call last):
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\streamlit\runtime\scriptrunner\script_runner.py", line 589, in _run_script
    exec(code, module.__dict__)
  File "C:\data\git\AthenIA\streamlit_app.py", line 76, in <module>
    asyncio.run(main())
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 30, in run  
    return loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 98, in run_until_complete
    return f.result()
           ^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\asyncio\futures.py", line 203, in result
    raise self._exception.with_traceback(self._exception_tb)
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\asyncio\tasks.py", line 314, in __step_run_and_handle_result
    result = coro.send(None)
             ^^^^^^^^^^^^^^^
  File "C:\data\git\AthenIA\streamlit_app.py", line 64, in main
    response = await response_generator.chat_async(user_query=prompt, stream=False)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\data\git\AthenIA\src\components\response_synthesis.py", line 175, in chat_async
    response = await chat_engine.achat(user_query)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\callbacks\utils.py", line 56, in async_wrapper
    return await func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\chat_engine\context.py", line 236, in achat
    context_str_template, nodes = await self._agenerate_context(message)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\chat_engine\context.py", line 124, in _agenerate_context
    nodes = await self._retriever.aretrieve(message)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\base\base_retriever.py", line 274, in aretrieve
    nodes = await self._aretrieve(query_bundle=query_bundle)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 112, in _aretrieve
    return await self._aget_nodes_with_embeddings(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 184, in _aget_nodes_with_embeddings
    query_result = await self._vector_store.aquery(query, **self._kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\vector_stores\elasticsearch\base.py", line 452, in aquery
    hits = await self._store.search(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\helpers\vectorstore\_async\vectorstore.py", line 277, in search
    response = await self.client.search(
               ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\__init__.py", line 4121, in search
    return await self.perform_request(  # type: ignore[return-value]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\_base.py", line 271, in perform_request
    response = await self._perform_request(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\_base.py", line 316, in _perform_request
    meta, resp_body = await self.transport.perform_request(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elastic_transport\_async_transport.py", line 264, in perform_request
    resp = await node.perform_request(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elastic_transport\_node\_http_aiohttp.py", line 179, in perform_request
    async with self.session.request(
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\client.py", line 1197, in __aenter__
    self._resp = await self._coro
                 ^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\client.py", line 507, in _request
    with timer:
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\helpers.py", line 715, in __enter__
    raise RuntimeError(
RuntimeError: Timeout context manager should be used inside a task
miguelgrinberg commented 1 month ago

You are still using nest_asyncio

Traceback (most recent call last):
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\streamlit\runtime\scriptrunner\script_runner.py", line 589, in _run_script
    exec(code, module.__dict__)
  File "C:\data\git\AthenIA\streamlit_app.py", line 76, in <module>
    asyncio.run(main())
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 30, in run  
    return loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 98, in run_until_complete
    return f.result()
           ^^^^^^^^^^
FlorentGrenier commented 1 month ago

I'm really sorry, but I don't understand where....

streamlit_app.py

import streamlit as st
#sys.path.append('C:/data/git/***/src')
from src.components.response_synthesis import ResponseSynthesis
from src.utils.utils import Utils
import time
from streamlit_pills import pills
import asyncio

st.set_page_config(page_title="AthenAI",  page_icon='🤖', layout="centered", initial_sidebar_state="auto", menu_items=None)

@st.cache_resource(show_spinner=False)
def load_data():
    with st.spinner(text="Chargement des documents. Cela devrait prendre 1 à 2 minutes."):
        response_generator = ResponseSynthesis(local_index=False)
        return response_generator

def generate_source_badges(sources):
    badges = ""
    for file_name, source_info in sources.items():
        badges += f" **Fichier**: `{file_name}` | **Pages**: `{source_info['page']}` | **Dernière modification**: `{source_info['last_modified']}`  \n"
    return badges

def response_generator_stream(response_chat):
    for word in response_chat.split():
        yield word + " "
        time.sleep(0.05)

async def main():
    response_generator = load_data()

    st.title("AthenAI 💬")

    with st.expander("Information"):
        st.info("Version de démonstration", icon="ℹ️")
        st.markdown('''
                    Sa base de connaissance est constitué de 4 documents :
                    - Un document sur le contrat d'apprentissage
                    - Un guide CFA
                    - Un document sur ParcourSup
                    - Un document sur la signature electronique
                ''')

    if "messages" not in st.session_state.keys():
        st.session_state.messages = [
            {"role": "assistant", "content": "Posez moi une question sur la documentation de Val!"}
        ]

    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.write(message["content"])
            if "source" in message.keys():
                st.markdown(message["source"])

    if prompt := st.chat_input("Votre question"):
        st.session_state.messages.append({"role": "user", "content": prompt})
        with st.chat_message("user"):
            st.markdown(prompt)

    if st.session_state.messages[-1]["role"] == "user":
        with st.chat_message("assistant"):
            with st.spinner("Je recherche dans mes documents..."):
                response = await response_generator.chat_async(user_query=prompt, stream=False)

                response_chat = response.response
                st.write(response_chat)

                source_badges = generate_source_badges(Utils.get_sources(response.source_nodes))
                st.markdown(source_badges)

                st.session_state.messages.append({"role": "assistant", "content": response_chat, "source": source_badges})

if __name__ == '__main__':
    asyncio.run(main())

response_synthesis.py



from elasticsearch import  AsyncElasticsearch
from llama_index.core.chat_engine.context import ContextChatEngine
from llama_index.core.chat_engine.simple import SimpleChatEngine

from llama_index.vector_stores.elasticsearch import ElasticsearchStore
from llama_index.core.prompts.prompt_type import PromptType
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.ollama import Ollama
from llama_index.core import (
    VectorStoreIndex, PromptTemplate, StorageContext,
    load_index_from_storage, Settings, get_response_synthesizer
)
from llama_index.core.tools import FunctionTool
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.llms import LLM, ChatMessage, MessageRole

from typing import List, Tuple
from llama_index.core.base.response.schema import RESPONSE_TYPE, NodeWithScore

from src.variables.config import Configs
from src.utils.utils import Utils

import os
import asyncio

class ResponseSynthesis:
    def __init__(self, local_index: bool) -> None :
        Settings.embed_model = HuggingFaceEmbedding(model_name=Configs.embed_model, embed_batch_size=Configs.batch_size)
        Settings.llm = Ollama(model=Configs.model, request_timeout=1000)

        if local_index:
            self.load_local_index()
        else:
            self.load_es_index()

    def load_es_index(self) -> None :
        async_client = AsyncElasticsearch(
            hosts=[Configs.elastic_url],
            basic_auth=(Configs.elastic_username, Configs.elastic_password)
            )

        try:
            vector_store = ElasticsearchStore(
                index_name=Configs.index_name,
                es_url=Configs.elastic_url,
                es_client=async_client
            )
            self.index = VectorStoreIndex.from_vector_store(vector_store=vector_store, embed_model=Settings.embed_model)
        finally:
            async_client.transport.close()

    def load_local_index(self) -> None:
        if os.path.exists(Configs.local_index_dir):
            storage_context = StorageContext.from_defaults(persist_dir=Configs.local_index_dir)
            index = load_index_from_storage(storage_context)
            self.index = index
        else:
            raise FileNotFoundError(f"Le dossier de stockage d'index '{Configs.local_index_dir}' n'existe pas. Veuillez d'abord exécuter le script d'indexation.")

    def query(self, user_query: str, similarity_top_k: int=Configs.similarity_top_k) -> RESPONSE_TYPE:
        query_engine = self.index.as_query_engine(similarity_top_k=similarity_top_k, llm=Settings.llm)
        query_engine.update_prompts(
            {"response_synthesizer:text_qa_template": self.build_context_prompt()}
        )
        query_response = query_engine.query(user_query)
        return query_response

    def retriever(self, user_query: str, similarity_top_k: int = Configs.similarity_top_k)  -> List[NodeWithScore] :
        retriever = self.index.as_retriever(similarity_top_k=similarity_top_k)
        retrieved_nodes = retriever.retrieve(user_query)
        return retrieved_nodes

    def chat(self, user_query: str, similarity_top_k: int = Configs.similarity_top_k, stream: bool = True):

        memory = ChatMemoryBuffer.from_defaults(token_limit=3000)

        system_mesage = [
            ChatMessage(role=MessageRole.SYSTEM, content= Utils.DEFAULT_SYSTEM_PROMPT + Utils.DEFAULT_PREFIX_PROMPT)
        ] 

        chat_engine = self.index.as_chat_engine(
            chat_mode="context", 
            memory=memory,
            llm=Settings.llm,
            system_prompt = system_mesage,
            context_template=Utils.DEFAULT_CONTEXT_PROMPT,
            verbose=True
        )

        if stream : 
            response = chat_engine.stream_chat(user_query)
        else : 
            response = chat_engine.chat(user_query)

        return response

    async def chat_async(self, user_query: str, similarity_top_k: int = Configs.similarity_top_k, stream: bool = True):
        memory = ChatMemoryBuffer.from_defaults(token_limit=3000)

        system_mesage = [
            ChatMessage(role=MessageRole.SYSTEM, content= Utils.DEFAULT_SYSTEM_PROMPT + Utils.DEFAULT_PREFIX_PROMPT)
        ] 

       # my_retriever = self.index.as_retriever(similarity_top_k=similarity_top_k)

        chat_engine = self.index.as_chat_engine(
            chat_mode="context", 
            memory=memory,
            llm=Settings.llm,
            system_prompt = system_mesage,
            context_template=Utils.DEFAULT_CONTEXT_PROMPT,
            verbose=True
        )

        response = await chat_engine.achat(user_query)
        return response
miguelgrinberg commented 1 month ago

Okay. I'm not exactly sure, but you do have some sync code, maybe that is causing a second loop to run.

In any case, here is another possible workaround. The issues you are having are caused by aiohttp (low-level HTTP client) not liking that there are two active loops in the same app. You can try switching from aiohttp to httpx as HTTP client to see if that maybe avoids the issue. In the load_es_index(), try this change:

        async_client = AsyncElasticsearch(
            hosts=[Configs.elastic_url],
            basic_auth=(Configs.elastic_username, Configs.elastic_password),
            node_class='httpxasync'   # <--- add this argument
            )

Also make sure you have the httpx package installed on your virtualenv.

I'm thinking if that does not work I'm going to have to invest some time and build a small streamlit + llama-index app to test this myself, so let me know. It may take me a couple of weeks, but I can look into it if httpx does not solve your issue.

FlorentGrenier commented 1 month ago

Hey ! I tested your solution with httpx and it solves my problem, thank you !