run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
37.12k stars 5.33k forks source link

[Bug]: OpenSearch ConnectionError(Timeout context manager should be used inside a task) #13358

Open chrfthr opened 6 months ago

chrfthr commented 6 months ago

Bug Description

After upgrading from 0.9.3 I get a connection error when querying my OpenSearch vectorstore. I'm not sure if I should post this here or open an opensearch-py issue..

I have llama-index installed in the following conda environment with Pyhton 3.10.0:

channels:

Version

0.13.34

Steps to Reproduce

Query a RetrieverQueryEngine built from a VectorIndexRetriever, a VectorStoreIndex and an OpensearchVectorStore. I implemented a custom version of the VectorIndexRetriever, but this should not be relevant as the error also appeared when using the library retriever. The custom vector retriever is also wrapped in a custom retriever.

Relevant Logs/Tracbacks

File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/streamlit/runtime/scriptrunner/script_runner.py", line 600, in _run_script
    exec(code, module.__dict__)
File "/home/christian/llmproject/src/frontend/web.py", line 151, in <module>
    response, context = generate_response(prompt, reasoning, keywords, toggle, file_name, options)
File "/home/christian/llmproject/src/frontend/web.py", line 21, in generate_response
    res, context = agent.query(index_name, prompt, reasoning, keywords, toggle, file_name, options)
File "/home/christian/llmproject/src/retrieval/Agent.py", line 174, in query
    response = query_engine.query(query_bundle)
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py", line 274, in wrapper
    result = func(*args, **kwargs)
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/llama_index/core/base/base_query_engine.py", line 53, in query
    query_result = self._query(str_or_query_bundle)
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py", line 274, in wrapper
    result = func(*args, **kwargs)
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/llama_index/core/query_engine/retriever_query_engine.py", line 189, in _query
    nodes = self.retrieve(query_bundle)
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/llama_index/core/query_engine/retriever_query_engine.py", line 144, in retrieve
    nodes = self._retriever.retrieve(query_bundle)
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py", line 274, in wrapper
    result = func(*args, **kwargs)
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/llama_index/core/base/base_retriever.py", line 244, in retrieve
    nodes = self._retrieve(query_bundle)
File "/home/christian/llmproject/src/retrieval/retrievers/CustomRetriever.py", line 62, in _retrieve
    vector_nodes = self._vector_retriever.retrieve(retrieval_bundle)
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py", line 274, in wrapper
    result = func(*args, **kwargs)
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/llama_index/core/base/base_retriever.py", line 244, in retrieve
    nodes = self._retrieve(query_bundle)
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py", line 274, in wrapper
    result = func(*args, **kwargs)
File "/home/christian/llmproject/src/retrieval/retrievers/CustomVectorIndexRetriever.py", line 100, in _retrieve
    return self._get_nodes_with_embeddings(query_bundle)
File "/home/christian/llmproject/src/retrieval/retrievers/CustomVectorIndexRetriever.py", line 189, in _get_nodes_with_embeddings
    query_result = self._vector_store.query(query, **self._kwargs)
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/llama_index/vector_stores/opensearch/base.py", line 525, in query
    return asyncio.get_event_loop().run_until_complete(self.aquery(query, **kwargs))
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/nest_asyncio.py", line 98, in run_until_complete
    return f.result()
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/asyncio/futures.py", line 201, in result
    raise self._exception
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/asyncio/tasks.py", line 232, in __step
    result = coro.send(None)
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/llama_index/vector_stores/opensearch/base.py", line 539, in aquery
    return await self._client.aquery(
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/llama_index/vector_stores/opensearch/base.py", line 399, in aquery
    res = await self._os_client.search(
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/opensearchpy/_async/client/__init__.py", line 1748, in search
    return await self.transport.perform_request(
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/opensearchpy/_async/transport.py", line 425, in perform_request
    raise e
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/opensearchpy/_async/transport.py", line 389, in perform_request
    status, headers_response, data = await connection.perform_request(
File "/home/christian/miniforge3/envs/streamlit/lib/python3.10/site-packages/opensearchpy/_async/http_aiohttp.py", line 329, in perform_request
    raise ConnectionError("N/A", str(e), e)
dosubot[bot] commented 6 months ago

The error you're encountering, "ConnectionError(Timeout context manager should be used inside a task)," suggests that the asynchronous operation with OpenSearch is being executed outside of an async function or context. To resolve this, ensure your code interacts with OpenSearch within an async context. Here's a structured approach:

  1. Async Function: Wrap your OpenSearch query logic within an async function. This ensures that the asynchronous code is executed within the correct context.

  2. Await Async Operations: Use await for asynchronous calls, such as querying OpenSearch, to wait for their completion within the async function.

  3. Async Context Manager: If using a context manager (e.g., for managing connections), use async with to ensure it's handled asynchronously.

Here's an example to guide you:

import asyncio
from opensearchpy import AsyncOpenSearch

async def query_opensearch():
    opensearch_url = 'http://localhost:9200'  # Replace with your details
    async with AsyncOpenSearch(
        opensearch_url,
        http_compress=True,  # Optional
    ) as client:
        response = await client.search(index="index_name", body={"query": {"match_all": {}}})
        print(response)

if __name__ == "__main__":
    asyncio.run(query_opensearch())

This code demonstrates the correct way to perform asynchronous operations with OpenSearch, ensuring they're executed within an async context to avoid the "ConnectionError" you're experiencing.

For further troubleshooting, especially if adapting the code doesn't resolve the issue, providing more context about your execution environment (e.g., script, Jupyter notebook) could help identify additional factors contributing to the problem.

#### Details **Sources** - [[Question]: vector_store.add(nodes) gives me error = opensearchpy.exceptions.ConnectionTimeout: ConnectionTimeout caused by - TimeoutError()](https://github.com/run-llama/llama_index/issues/13110) - [[Bug]: *** RuntimeError: Timeout context manager should be used inside a task](https://github.com/run-llama/llama_index/issues/9520) - [llama-index-core/llama_index/core/instrumentation/dispatcher.py](https://github.com/run-llama/llama_index/blob/main/llama-index-core/llama_index/core/instrumentation/dispatcher.py) - [llama-index-integrations/vector_stores/llama-index-vector-stores-opensearch/llama_index/vector_stores/opensearch/base.py](https://github.com/run-llama/llama_index/blob/main/llama-index-integrations/vector_stores/llama-index-vector-stores-opensearch/llama_index/vector_stores/opensearch/base.py) - [llama-index-integrations/indices/llama-index-indices-managed-vectara/llama_index/indices/managed/vectara/base.py](https://github.com/run-llama/llama_index/blob/main/llama-index-integrations/indices/llama-index-indices-managed-vectara/llama_index/indices/managed/vectara/base.py) - [llama-index-integrations/vector_stores/llama-index-vector-stores-opensearch/pyproject.toml](https://github.com/run-llama/llama_index/blob/main/llama-index-integrations/vector_stores/llama-index-vector-stores-opensearch/pyproject.toml) - [llama-index-integrations/retrievers/llama-index-retrievers-pathway/tests/BUILD](https://github.com/run-llama/llama_index/blob/main/llama-index-integrations/retrievers/llama-index-retrievers-pathway/tests/BUILD)
**About Dosu** This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

logan-markewich commented 6 months ago

Is this a timeout? your traceback just shows some connection error?

chrfthr commented 6 months ago

Is this a timeout? your traceback just shows some connection error?

Connection issues should not be a problem. Everything is run locally at this stage and I check the health of the OpenSearch cluster before doing anything

res = requests.get(endpoint + '/_cluster/health?wait_for_status=yellow&timeout=30s')

It rather looks like some sort of async error in the library

GaLebel commented 5 months ago

Hey @chrfthr,

Did you mange to solve the issue by any chance? I'm facing the exact same issue. Basically, everything works fine until I'm trying to get my RAG into Streamlit. This happens regardless of whether or not streaming is set to True or False. So I cannot integrate my RAG implementation into Streamlit frontend at all. This also happens with Gradio.

Traceback:

File "/home/user-hehe/.local/lib/python3.10/site-packages/streamlit/runtime/scriptrunner/script_runner.py", line 589, in _run_script
    exec(code, module.__dict__)
File "/home/user-hehe/Projects/LI/2024-user-hehe-/server/streamlit/pages/new_rag_page.py", line 106, in <module>
    response = st.session_state.indexer.streaming_query(query = question, k = 5, mmr = True)
File "/home/user-hehe/Projects/LI/2024-user-hehe-/pipeline/opensearchllamaindexer/OpenSearchLlamaIndexer.py", line 497, in streaming_query
    response = query_engine.query(query)
File "/home/user-hehe/.local/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py", line 230, in wrapper
    result = func(*args, **kwargs)
File "/home/user-hehe/.local/lib/python3.10/site-packages/llama_index/core/base/base_query_engine.py", line 52, in query
    query_result = self._query(str_or_query_bundle)
File "/home/user-hehe/.local/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py", line 230, in wrapper
    result = func(*args, **kwargs)
File "/home/user-hehe/.local/lib/python3.10/site-packages/llama_index/core/query_engine/retriever_query_engine.py", line 189, in _query
    nodes = self.retrieve(query_bundle)
File "/home/user-hehe/.local/lib/python3.10/site-packages/llama_index/core/query_engine/retriever_query_engine.py", line 144, in retrieve
    nodes = self._retriever.retrieve(query_bundle)
File "/home/user-hehe/.local/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py", line 230, in wrapper
    result = func(*args, **kwargs)
File "/home/user-hehe/.local/lib/python3.10/site-packages/llama_index/core/base/base_retriever.py", line 243, in retrieve
    nodes = self._retrieve(query_bundle)
File "/home/user-hehe/.local/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py", line 230, in wrapper
    result = func(*args, **kwargs)
File "/home/user-hehe/.local/lib/python3.10/site-packages/llama_index/core/indices/vector_store/retrievers/retriever.py", line 101, in _retrieve
    return self._get_nodes_with_embeddings(query_bundle)
File "/home/user-hehe/.local/lib/python3.10/site-packages/llama_index/core/indices/vector_store/retrievers/retriever.py", line 177, in _get_nodes_with_embeddings
    query_result = self._vector_store.query(query, **self._kwargs)
File "/home/user-hehe/.local/lib/python3.10/site-packages/llama_index/vector_stores/opensearch/base.py", line 585, in query
    return asyncio.get_event_loop().run_until_complete(self.aquery(query, **kwargs))
File "/home/user-hehe/.local/lib/python3.10/site-packages/nest_asyncio.py", line 98, in run_until_complete
    return f.result()
File "/usr/lib/python3.10/asyncio/futures.py", line 201, in result
    raise self._exception.with_traceback(self._exception_tb)
File "/usr/lib/python3.10/asyncio/tasks.py", line 232, in __step
    result = coro.send(None)
File "/home/user-hehe/.local/lib/python3.10/site-packages/llama_index/vector_stores/opensearch/base.py", line 599, in aquery
    return await self._client.aquery(
File "/home/user-hehe/.local/lib/python3.10/site-packages/llama_index/vector_stores/opensearch/base.py", line 425, in aquery
    res = await self._os_client.search(
File "/home/user-hehe/.local/lib/python3.10/site-packages/opensearchpy/_async/client/__init__.py", line 2359, in search
    return await self.transport.perform_request(
File "/home/user-hehe/.local/lib/python3.10/site-packages/opensearchpy/_async/transport.py", line 421, in perform_request
    raise e
File "/home/user-hehe/.local/lib/python3.10/site-packages/opensearchpy/_async/transport.py", line 385, in perform_request
    status, headers_response, data = await connection.perform_request(
File "/home/user-hehe/.local/lib/python3.10/site-packages/opensearchpy/_async/http_aiohttp.py", line 329, in perform_request
    raise ConnectionError("N/A", str(e), e)