run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
36.63k stars 5.24k forks source link

[Bug]: OpenSearch Vector Store have a lot of connection problem after #11513 #13692

Open rendyfebry opened 5 months ago

rendyfebry commented 5 months ago

Bug Description

I noticed that after #11513, my OpenSearch vector store connection became unstable, and we can confirm this by trending bug reports related to this issue recently. (#13358, #11661, etc)

I also see it suggested to use asyncio, but even with that we can still get the errors

import nest_asyncio
nest_asyncio.apply()

I also notice, this error getting worse when we use LlamaIndex with FastAPI,

Version

latest

Steps to Reproduce

Just use OpenSearch vector store long enough that you'll notice it's become unstable.

Relevant Logs/Tracbacks

No response

dosubot[bot] commented 5 months ago

To address the connection problems with the OpenSearch vector store after the implementation of #11513, especially when using LlamaIndex with FastAPI, you need to ensure that the event loop is managed correctly within the FastAPI context. The errors you're encountering, such as "This event loop is already running" and "ConnectionError(Timeout context manager should be used inside a task) caused by: RuntimeError(Timeout context manager should be used inside a task)," are typically related to improper handling of asynchronous operations.

Here is a revised approach to handle the event loop properly:

  1. Use asyncio.run to manage the event loop for asynchronous operations.
  2. Ensure that the event loop is not already running when performing asynchronous operations.

Below is an example of how you can modify your code to handle these issues:

import asyncio
import logging
import pytest
import uuid
from typing import List, Generator

from llama_index.legacy.schema import NodeRelationship, RelatedNodeInfo, TextNode
from llama_index.legacy.vector_stores.opensearch import (
    OpensearchVectorClient,
    OpensearchVectorStore,
)
from llama_index.legacy.vector_stores.types import VectorStoreQuery

logging.basicConfig(level=logging.DEBUG)

try:
    from opensearchpy import AsyncOpenSearch

    async def check_opensearch():
        os_client = AsyncOpenSearch("localhost:9200")
        await os_client.info()
        await os_client.close()

    asyncio.run(check_opensearch())
    opensearch_not_available = False
except (ImportError, Exception):
    opensearch_not_available = True

@pytest.mark.skipif(opensearch_not_available, reason="opensearch is not available")
def test_connection() -> None:
    assert True

@pytest.fixture()
def index_name() -> str:
    """Return the index name."""
    return f"test_{uuid.uuid4().hex}"

@pytest.fixture()
def os_store(index_name: str) -> Generator[OpensearchVectorStore, None, None]:
    client = OpensearchVectorClient(
        endpoint="localhost:9200",
        index=index_name,
        dim=3,
    )

    yield OpensearchVectorStore(client)

    # teardown step
    # delete index
    asyncio.run(client._os_client.indices.delete(index=index_name))
    # close client aiohttp session
    asyncio.run(client._os_client.close())

@pytest.fixture(scope="session")
def node_embeddings() -> List[TextNode]:
    return [
        TextNode(
            text="lorem ipsum",
            id_="c330d77f-90bd-4c51-9ed2-57d8d693b3b0",
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-0")},
            metadata={
                "author": "Stephen King",
                "theme": "Friendship",
            },
            embedding=[1.0, 0.0, 0.0],
        ),
        TextNode(
            text="lorem ipsum",
            id_="c3d1e1dd-8fb4-4b8f-b7ea-7fa96038d39d",
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-1")},
            metadata={
                "director": "Francis Ford Coppola",
                "theme": "Mafia",
            },
            embedding=[0.0, 1.0, 0.0],
        ),
        TextNode(
            text="lorem ipsum",
            id_="c3ew11cd-8fb4-4b8f-b7ea-7fa96038d39d",
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-2")},
            metadata={
                "director": "Christopher Nolan",
            },
            embedding=[0.0, 0.0, 1.0],
        ),
        TextNode(
            text="I was taught that the way of progress was neither swift nor easy.",
            id_="0b31ae71-b797-4e88-8495-031371a7752e",
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-3")},
            metadata={
                "author": "Marie Curie",
            },
            embedding=[0.0, 0.0, 0.9],
        ),
        TextNode(
            text=(
                "The important thing is not to stop questioning."
                + " Curiosity has its own reason for existing."
            ),
            id_="bd2e080b-159a-4030-acc3-d98afd2ba49b",
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-4")},
            metadata={
                "author": "Albert Einstein",
            },
            embedding=[0.0, 0.0, 0.5],
        ),
        TextNode(
            text=(
                "I am no bird; and no net ensnares me;"
                + " I am a free human being with an independent will."
            ),
            id_="f658de3b-8cef-4d1c-8bed-9a263c907251",
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-5")},
            metadata={
                "author": "Charlotte Bronte",
            },
            embedding=[0.0, 0.0, 0.3],
        ),
    ]

def count_docs_in_index(os_store: OpensearchVectorStore) -> int:
    """Refresh indices and return the count of documents in the index."""
    asyncio.run(os_store.client._os_client.indices.refresh(index=os_store.client._index))
    count = asyncio.run(os_store.client._os_client.count(index=os_store.client._index))
    return count["count"]

@pytest.mark.skipif(opensearch_not_available, reason="opensearch is not available")
def test_functionality(
    os_store: OpensearchVectorStore, node_embeddings: List[TextNode]
) -> None:
    # add
    assert len(os_store.add(node_embeddings)) == len(node_embeddings)
    # query
    exp_node = node_embeddings[3]
    query = VectorStoreQuery(query_embedding=exp_node.embedding, similarity_top_k=1)
    query_result = os_store.query(query)
    assert query_result.nodes
    assert query_result.nodes[0].get_content() == exp_node.text
    # delete one node using its associated doc_id
    os_store.delete("test-1")
    assert count_docs_in_index(os_store) == len(node_embeddings) - 1

This approach ensures that the event loop is properly managed and avoids the "This event loop is already running" and "ConnectionError(Timeout context manager should be used inside a task) caused by: RuntimeError(Timeout context manager should be used inside a task)" errors.

If the issue persists, please provide more specific logs or tracebacks to further diagnose the problem.

#### Details **Sources** - [llama-index-integrations/vector_stores/llama-index-vector-stores-opensearch/tests/test_opensearch_client.py](https://github.com/run-llama/llama_index/blob/main/llama-index-integrations/vector_stores/llama-index-vector-stores-opensearch/tests/test_opensearch_client.py) - [llama-index-integrations/vector_stores/llama-index-vector-stores-opensearch/llama_index/vector_stores/opensearch/base.py](https://github.com/run-llama/llama_index/blob/main/llama-index-integrations/vector_stores/llama-index-vector-stores-opensearch/llama_index/vector_stores/opensearch/base.py) - [llama-index-legacy/llama_index/legacy/vector_stores/opensearch.py](https://github.com/run-llama/llama_index/blob/main/llama-index-legacy/llama_index/legacy/vector_stores/opensearch.py) - [CHANGELOG.md](https://github.com/run-llama/llama_index/blob/main/CHANGELOG.md)
**About Dosu** This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

logan-markewich commented 5 months ago

It would be nice to support async in this vector store. If you have any ideas for improvements, open to PRs 😇

rendyfebry commented 5 months ago

@logan-markewich I Agree with the async, but I think would be great if the changes were backward compatible or at least made them optional, so it won't break an existing implementation.

Unfortunately, I'm not really good with OpenSearch connection or python async handling, so I can't help with this one.

der-ofenmeister commented 3 months ago

I have written a patched Opensearch class to handle async and connection issues do to this - happy to send a PR, although I'm not sure how compatible it will be (should probably be good for llama-index v0.10+)

GeorgeDittmar commented 2 months ago

We are running into similar issues. I had to basically rewrite the Opensearch support to go back to using the sync version of opensearch-py to get it to run in our fast api service. I could put in a PR for allowing a flag to use async or not possibly?

ulan-yisaev commented 2 months ago

I also encountered similar connection issues when using OpenSearch with FastAPI. I think the patch by @der-ofenmeister or the flag to allow async/sync choice suggested by @GeorgeDittmar would be a great solution. It would be much better to avoid using workarounds like nest_asyncio.

Looking forward to the patch!

dosubot[bot] commented 2 months ago

To resolve the connection problems in OpenSearch Vector Store after #11513, especially when using LlamaIndex with FastAPI, you need to ensure that the OpensearchVectorClient is correctly initialized and that the asynchronous OpenSearch client is properly configured. Here is a code snippet that demonstrates how to initialize the OpensearchVectorClient and handle the connection:

from llama_index.vector_stores.opensearch import (
    OpensearchVectorStore,
    OpensearchVectorClient,
)
from fastapi import FastAPI

app = FastAPI()

# http endpoint for your cluster (opensearch required for vector index usage)
endpoint = "http://localhost:9200"
# index to demonstrate the VectorStore impl
idx = "gpt-index-demo"

# OpensearchVectorClient stores text in this field by default
text_field = "content"
# OpensearchVectorClient stores embeddings in this field by default
embedding_field = "embedding"

# Initialize the OpensearchVectorClient
client = OpensearchVectorClient(
    endpoint, idx, 1536, embedding_field=embedding_field, text_field=text_field
)

# Initialize vector store
vector_store = OpensearchVectorStore(client)

@app.get("/")
async def read_root():
    return {"message": "OpenSearch Vector Store is connected successfully"}

Ensure that you have the necessary dependencies installed:

pip install fastapi uvicorn opensearch-py llama-index-vector-stores-opensearch

Run the FastAPI application:

uvicorn your_script_name:app --reload

Additionally, you can use the following test setup to verify the connection and functionality of the OpenSearch Vector Store:

import asyncio
import logging
import pytest
import uuid
from llama_index.vector_stores.opensearch import (
    OpensearchVectorClient,
    OpensearchVectorStore,
)
from llama_index.core.vector_stores.types import VectorStoreQuery

logging.basicConfig(level=logging.DEBUG)
evt_loop = asyncio.get_event_loop()

try:
    from opensearchpy import AsyncOpenSearch

    os_client = AsyncOpenSearch("localhost:9200")
    evt_loop.run_until_complete(os_client.info())
    opensearch_not_available = False
except (ImportError, Exception):
    opensearch_not_available = True
finally:
    evt_loop.run_until_complete(os_client.close())

@pytest.mark.skipif(opensearch_not_available, reason="opensearch is not available")
def test_connection() -> None:
    assert True

@pytest.fixture()
def index_name() -> str:
    return f"test_{uuid.uuid4().hex}"

@pytest.fixture()
def os_store(index_name: str) -> Generator[OpensearchVectorStore, None, None]:
    client = OpensearchVectorClient(
        endpoint="localhost:9200",
        index=index_name,
        dim=3,
    )
    yield OpensearchVectorStore(client)
    evt_loop.run_until_complete(client._os_client.indices.delete(index=index_name))
    evt_loop.run_until_complete(client._os_client.close())

def count_docs_in_index(os_store: OpensearchVectorStore) -> int:
    evt_loop.run_until_complete(
        os_store.client._os_client.indices.refresh(index=os_store.client._index)
    )
    count = evt_loop.run_until_complete(
        os_store.client._os_client.count(index=os_store.client._index)
    )
    return count["count"]

@pytest.mark.skipif(opensearch_not_available, reason="opensearch is not available")
def test_functionality(os_store: OpensearchVectorStore) -> None:
    # Add and query functionality tests
    pass

This setup should help resolve the connection problems by properly initializing the OpensearchVectorClient and ensuring that the asynchronous client is correctly configured. Additionally, the provided test setup can be used to verify the connection and functionality of the OpenSearch Vector Store [1][2].

To continue talking to Dosu, mention @dosu.

GeorgeDittmar commented 2 months ago

yeah nest_asyncio didnt even work for us on our service but even then I wouldnt trust it for a prod solution. I'll maybe start work on a flag approach unless @der-ofenmeister puts their patch in first

ulan-yisaev commented 2 months ago

Hi @GeorgeDittmar, would you be able to share the details of the approach you mentioned? I'd love to test it out, as we're facing similar issues and looking for a more stable solution than nest_asyncio.

Thanks in advance!

ulan-yisaev commented 2 months ago

After doing some research, I think I’ve figured out how to get it running with the sync client. Here’s what I’ve come up with:

from opensearchpy import OpenSearch

sync_client = OpenSearch(
    hosts=[{'host': host, 'port': port}], ...
)

opensearch_client = OpensearchVectorClient(
    ...  # Other parameters
    os_client=sync_client
)
ulan-yisaev commented 2 months ago

It wasn't as straightforward as expected :). I had to implement custom versions of OpensearchVectorStore and OpensearchVectorClient to switch the async operations to synchronous ones. After doing that, it seems to be working now.

GeorgeDittmar commented 2 months ago

Yeah thats exactly what I had to do on my end sadly. I'll try to work on adding a param flag and get a pr in to call the right underlying opensearch-py client based on that.