Closed kobiche closed 1 week ago
Hey @kobiche! 👋 I'm here to help you with any bugs, questions, or contributions you have. I'm a bot designed to assist you while you wait for a human maintainer.
Yes, the OpensearchVectorClient
in LlamaIndex runs exclusively using async code. This is evident from the use of asynchronous methods and the AsyncOpenSearch
client throughout the implementation. For example, the _bulk_ingest_embeddings
method is defined as an asynchronous function, and the client is initialized using AsyncOpenSearch
[1].
To adjust the code in LlamaIndex to offer both sync and async possibilities, you can create synchronous counterparts for the asynchronous methods. Here is an example of how you can modify the OpensearchVectorClient
class to include both sync and async methods:
class OpensearchVectorClient:
# Existing async methods...
def _get_sync_opensearch_client(self, opensearch_url: str, **kwargs: Any) -> Any:
"""Get Sync OpenSearch client from the opensearch_url, otherwise raise error."""
try:
from opensearchpy import OpenSearch
client = OpenSearch(opensearch_url, **kwargs)
except ValueError as e:
raise ValueError(
f"OpenSearch client string provided is not in proper format. "
f"Got error: {e} "
)
return client
def _bulk_ingest_embeddings_sync(
self,
client: Any,
index_name: str,
embeddings: List[List[float]],
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
vector_field: str = "embedding",
text_field: str = "content",
mapping: Optional[Dict] = None,
max_chunk_bytes: Optional[int] = 1 * 1024 * 1024,
is_aoss: bool = False,
) -> List[str]:
"""Sync Bulk Ingest Embeddings into given index."""
if not mapping:
mapping = {}
from opensearchpy.helpers import bulk
not_found_error = self._import_not_found_error()
requests = []
return_ids = []
mapping = mapping
try:
client.indices.get(index=index_name)
except not_found_error:
client.indices.create(index=index_name, body=mapping)
for i, text in enumerate(texts):
metadata = metadatas[i] if metadatas else {}
_id = ids[i] if ids else str(uuid.uuid4())
request = {
"_op_type": "index",
"_index": index_name,
vector_field: embeddings[i],
text_field: text,
"metadata": metadata,
}
if is_aoss:
request["id"] = _id
else:
request["_id"] = _id
requests.append(request)
return_ids.append(_id)
bulk(client, requests, max_chunk_bytes=max_chunk_bytes)
if not is_aoss:
client.indices.refresh(index=index_name)
return return_ids
def add_sync(
self,
nodes: List[BaseNode],
**add_kwargs: Any,
) -> List[str]:
"""
Add nodes to index synchronously.
Args:
nodes: List[BaseNode]: list of nodes with embeddings.
"""
return self._bulk_ingest_embeddings_sync(
self._os_client, self._index, [node.embedding for node in nodes],
[node.text for node in nodes], [node.metadata for node in nodes],
[node.id for node in nodes], self._embedding_field, self._text_field
)
This example shows how to add synchronous methods to the OpensearchVectorClient
class. You can follow a similar approach to add synchronous versions of other asynchronous methods as needed [1].
Additionally, there are existing synchronous methods in the OpensearchVectorClient
class that can be used as a reference for creating new synchronous methods. Specifically, the __init__
method uses synchronous calls to the OpenSearch client by running asynchronous methods within the event loop using event_loop.run_until_complete
[1].
To continue talking to Dosu, mention @dosu.
I don't have access to opensearch to make the changes or test, but I welcome a PR
I'm tempted to put in a PR to handle this. I had to rewrite the OpensearchClient and Vectorstore classes myself to remove the async calls cause they were failing under fastapi due to nested event loops. I can try to start this sometime this week and maybe just set a flag to make sure we use the sync or async opensearch-py libs.
I have most of changes ready. I'll try post a PR tomorrow
fyi you are going to want async for any production-ready server 😅
Nested async can be solved by either not using async entry points, or using nest_asyncio.apply()
and uvicorn.run(..., loop="asyncio")
Caveat here is that only the ingestion pipeline pipeline.arun(..)
and and vector store object directly use vector_store.async_add()
when indexing data 😅
So setting the event loop flag in uvicorn still throws errors under fastapi and same with nest_asyncio.apply().
fyi you are going to want async for any production-ready server 😅
However, I don't see the advantage of using the async code here, since each function has its own event_loop.run_until_complete(...)
. I thought the only way to really take advantage of async is to make the whole program async.
for my 2 cents, while i agree things like async threading is needed for high throughput production systems, doesnt really mean we shouldnt also support sync use cases. like I stated above the current opensearch support fails under FastApi due to nested async calls and trying the suggestions above do not seem to resolve it so only other option I have available at this moment is sync with threading.
Since my PR was merged (thanks @logan-markewich!), I will close my question.
Question Validation
Question
Like others (see #10719) I had some issues running my code using the OS-VectorClient as database. I have seen in the code this only runs using async code. Why? If I check the code in LangChain, they give sync and async possibilities. Can we adjust the code in LlamaIndex to offer both possibilities?