run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
33.39k stars 4.67k forks source link

[Bug]: Elasticsearch RuntimeError: Session and connector has to use same event loop when calling query_engine.query() the second time #9876

Closed cocoza4 closed 2 months ago

cocoza4 commented 6 months ago

Bug Description

I'm unable to create a sample app that recreates this issue, but it seems to be related to https://stackoverflow.com/questions/57678844/error-using-shared-tcpconnector-in-aiohttp

However, i notice that when I recreate vector_store instance in every request to elasticsearch the problem goes away.

def run(self, q: str):
        vector_store = ElasticsearchStore_llamaindex(
            index_name="marketing-index-llamaindex",
            es_url=ES_URL,
        )
        vector_index = VectorStoreIndex.from_vector_store(vector_store, service_context=self.service_context)
        self.query_engine = vector_index.as_query_engine()

        resp = self.query_engine.query(q)

Version

0.9.24

Steps to Reproduce

N/A

Relevant Logs/Tracbacks

RuntimeError: Session and connector has to use same event loop
Traceback:
File "/opt/homebrew/lib/python3.10/site-packages/streamlit/runtime/scriptrunner/script_runner.py", line 534, in _run_script
    exec(code, module.__dict__)
File "/Users/coco/workspace/poc-marketing-llm/app.py", line 96, in <module>
    for chunk in st.session_state.agent.stream({"input": prompt}):
File "/opt/homebrew/lib/python3.10/site-packages/langchain/agents/agent.py", line 1451, in stream
    for step in iterator:
File "/opt/homebrew/lib/python3.10/site-packages/langchain/agents/agent_iterator.py", line 174, in __iter__
    for chunk in self.agent_executor._iter_next_step(
File "/opt/homebrew/lib/python3.10/site-packages/langchain/agents/agent.py", line 1134, in _iter_next_step
    observation = tool.run(
File "/opt/homebrew/lib/python3.10/site-packages/langchain_core/tools.py", line 365, in run
    raise e
File "/opt/homebrew/lib/python3.10/site-packages/langchain_core/tools.py", line 337, in run
    self._run(*tool_args, run_manager=run_manager, **tool_kwargs)
File "/opt/homebrew/lib/python3.10/site-packages/langchain_core/tools.py", line 516, in _run
    else self.func(*args, **kwargs)
File "/Users/coco/workspace/poc-marketing-llm/backend/tools/hybrid_rag.py", line 55, in run
    resp = self.query_engine.query(q)
File "/opt/homebrew/lib/python3.10/site-packages/llama_index/core/base_query_engine.py", line 30, in query
    return self._query(str_or_query_bundle)
File "/opt/homebrew/lib/python3.10/site-packages/llama_index/query_engine/retriever_query_engine.py", line 170, in _query
    nodes = self.retrieve(query_bundle)
File "/opt/homebrew/lib/python3.10/site-packages/llama_index/query_engine/retriever_query_engine.py", line 126, in retrieve
    nodes = self._retriever.retrieve(query_bundle)
File "/opt/homebrew/lib/python3.10/site-packages/llama_index/core/base_retriever.py", line 54, in retrieve
    nodes = self._retrieve(query_bundle)
File "/opt/homebrew/lib/python3.10/site-packages/llama_index/indices/vector_store/retrievers/retriever.py", line 88, in _retrieve
    return self._get_nodes_with_embeddings(query_bundle)
File "/opt/homebrew/lib/python3.10/site-packages/llama_index/indices/vector_store/retrievers/retriever.py", line 164, in _get_nodes_with_embeddings
    query_result = self._vector_store.query(query, **self._kwargs)
File "/opt/homebrew/lib/python3.10/site-packages/llama_index/vector_stores/elasticsearch.py", line 452, in query
    return asyncio.get_event_loop().run_until_complete(
File "/opt/homebrew/lib/python3.10/site-packages/nest_asyncio.py", line 99, in run_until_complete
    return f.result()
File "/opt/homebrew/Cellar/python@3.10/3.10.8/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/futures.py", line 201, in result
    raise self._exception.with_traceback(self._exception_tb)
File "/opt/homebrew/Cellar/python@3.10/3.10.8/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/tasks.py", line 232, in __step
    result = coro.send(None)
File "/opt/homebrew/lib/python3.10/site-packages/llama_index/vector_stores/elasticsearch.py", line 524, in aquery
    response = await client.search(
File "/opt/homebrew/lib/python3.10/site-packages/elasticsearch/_async/client/__init__.py", line 3735, in search
    return await self.perform_request(  # type: ignore[return-value]
File "/opt/homebrew/lib/python3.10/site-packages/elasticsearch/_async/client/_base.py", line 285, in perform_request
    meta, resp_body = await self.transport.perform_request(
File "/opt/homebrew/lib/python3.10/site-packages/elastic_transport/_async_transport.py", line 258, in perform_request
    resp = await node.perform_request(
File "/opt/homebrew/lib/python3.10/site-packages/elastic_transport/_node/_http_aiohttp.py", line 136, in perform_request
    self._create_aiohttp_session()
File "/opt/homebrew/lib/python3.10/site-packages/elastic_transport/_node/_http_aiohttp.py", line 252, in _create_aiohttp_session
    self.session = aiohttp.ClientSession(
File "/opt/homebrew/lib/python3.10/site-packages/aiohttp/client.py", line 256, in __init__
    raise RuntimeError("Session and connector has to use same event loop")
dosubot[bot] commented 6 months ago

🤖

Hello @cocoza4! 🦙

I'm Dosu, an AI assistant here to help with your LlamaIndex issues, answer your questions, and guide you through contributions while we wait for a human maintainer. Just think of me as your friendly neighborhood bot! I look forward to assisting you.

The RuntimeError "Session and connector has to use same event loop" is typically raised when you are trying to use an asynchronous session or connector that was created in a different event loop than the one you are currently in.

In the context of LlamaIndex, this error might occur when calling query_engine.query() for the second time because the SQLJoinQueryEngine class, which is responsible for handling queries, is not properly managing its asynchronous tasks.

The _aquery method in the SQLJoinQueryEngine class is marked as an asynchronous method, but it's actually calling a synchronous method _query. This could potentially cause issues with the event loop, especially if you're calling query_engine.query() multiple times in an asynchronous context.

Here is the relevant code:

async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
    # TODO: make async
    return self._query(query_bundle)

To fix this issue, you should ensure that the _aquery method is truly asynchronous. This might involve refactoring the _query method and any methods it calls to be asynchronous as well.

Please note that without the complete code and the exact scenario in which the error occurs, this is just a probable cause and solution. The actual issue might be different based on how and where you're calling query_engine.query().

As for the function of the query_engine.query() method in the LlamaIndex codebase, it is designed to handle queries to the LlamaIndex. It accepts a QueryType object as an argument, which can either be a string or a QueryBundle object.

If the input is a string, it is converted into a QueryBundle object. The method then calls the _query() method with the QueryBundle object as an argument. The _query() method is an abstract method, meaning it is expected to be implemented by any class that inherits from BaseQueryEngine.

The query() method is also wrapped in a context manager provided by self.callback_manager.as_trace("query"). This suggests that the method is designed to be used with some form of callback system, which can be used to trace the execution of the query.

Here is the relevant code:

def query(self, str_or_query_bundle: QueryType) -> RESPONSE_TYPE:
    with self.callback_manager.as_trace("query"):
        if isinstance(str_or_query_bundle, str):
            str_or_query_bundle = QueryBundle(str_or_query_bundle)
        return self._query(str_or_query_bundle)

I hope this helps! If you have any more questions or need further clarification, feel free to ask.

Sources:

-Dosu

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

mglslg commented 1 month ago

@dosubot I have the same problem. How can I use Elasticsearch as a synchronized client ? I`ve got this response when I try to use synchronized client:

    connection_params: Dict[str, Any] = {
        "hosts": ["https://10.206.20.47:9200"],
        "basic_auth": ("elastic", "pKgZ2rSFXlKHY8o9T2Q+"),
        "verify_certs": False
    }

    sync_es_client = elasticsearch.Elasticsearch(**connection_params)

    es_store = ElasticsearchStore(
        index_name="sk_index",
        es_client=sync_es_client
    )

Got this error:

File "C:\Users\01420524\.conda\envs\sk-gpt\lib\asyncio\futures.py", line 201, in result
  raise self._exception.with_traceback(self._exception_tb)
File "C:\Users\01420524\.conda\envs\sk-gpt\lib\asyncio\tasks.py", line 232, in __step
  result = coro.send(None)
File "C:\Users\01420524\.conda\envs\sk-gpt\lib\site-packages\llama_index\vector_stores\elasticsearch\base.py", line 383, in async_add
  await self._create_index_if_not_exists(
File "C:\Users\01420524\.conda\envs\sk-gpt\lib\site-packages\llama_index\vector_stores\elasticsearch\base.py", line 281, in _create_index_if_not_exists
  if await self.client.indices.exists(index=index_name):
TypeError: object HeadApiResponse can't be used in 'await' expression

How can I solve this ?