run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
36.38k stars 5.19k forks source link

[Bug]: error Async Search client not initialized when running azureaisearch with use_async = True #15594

Open zhongshuai-cao opened 2 months ago

zhongshuai-cao commented 2 months ago

Bug Description

When I run ` vector_store = AzureAISearchVectorStore( search_or_index_client=index_client, index_name=index_name, index_management=IndexManagement.CREATE_IF_NOT_EXISTS, id_field_key="id", chunk_field_key="chunk", embedding_field_key="embedding", embedding_dimensionality=EMBED_SIZE, metadata_string_field_key="metadata", doc_id_field_key="doc_id", language_analyzer="en.lucene", vector_algorithm_type="exhaustiveKnn", )

storage_context = StorageContext.from_defaults(vector_store=vector_store)

VectorStoreIndex.from_documents( documents, storage_context=storage_context, show_progress=True, use_async=True ) `

I got error

Version

0.10.67.post1

Steps to Reproduce

` from llama_index.vector_stores.azureaisearch import AzureAISearchVectorStore

vector_store = AzureAISearchVectorStore( search_or_index_client=index_client, index_name=index_name, index_management=IndexManagement.CREATE_IF_NOT_EXISTS, id_field_key="id", chunk_field_key="chunk", embedding_field_key="embedding", embedding_dimensionality=EMBED_SIZE, metadata_string_field_key="metadata", doc_id_field_key="doc_id", language_analyzer="en.lucene", vector_algorithm_type="exhaustiveKnn", )

storage_context = StorageContext.from_defaults(vector_store=vector_store)

VectorStoreIndex.from_documents( documents, storage_context=storage_context, show_progress=True, use_async=True ) `

Relevant Logs/Tracbacks

Traceback (most recent call last):
  File "/Users/xxx/Projects/xxx/src/index_file.py", line 205, in run_batch
    VectorStoreIndex.from_documents(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/xxx/lib/python3.11/site-packages/llama_index/core/indices/base.py", line 145, in from_documents
    return cls(
           ^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/xxx/lib/python3.11/site-packages/llama_index/core/indices/vector_store/base.py", line 78, in __init__
    super().__init__(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/xxx/lib/python3.11/site-packages/llama_index/core/indices/base.py", line 94, in __init__
    index_struct = self.build_index_from_nodes(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/xxx/lib/python3.11/site-packages/llama_index/core/indices/vector_store/base.py", line 314, in build_index_from_nodes
    return self._build_index_from_nodes(nodes, **insert_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/xxx/lib/python3.11/site-packages/llama_index/core/indices/vector_store/base.py", line 283, in _build_index_from_nodes
    run_async_tasks(tasks)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/xxx/lib/python3.11/site-packages/llama_index/core/async_utils.py", line 77, in run_async_tasks
    outputs: List[Any] = asyncio_run(_gather())
                         ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/xxx/lib/python3.11/site-packages/llama_index/core/async_utils.py", line 33, in asyncio_run
    return loop.run_until_complete(coro)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/xxx/lib/python3.11/asyncio/base_events.py", line 654, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/uambot/lib/python3.11/site-packages/llama_index/core/async_utils.py", line 75, in _gather
    return await asyncio.gather(*tasks_to_execute)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/xxx/lib/python3.11/site-packages/llama_index/core/indices/vector_store/base.py", line 198, in _async_add_nodes_to_index
    new_ids = await self._vector_store.async_add(nodes_batch, **insert_kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/xxx/lib/python3.11/site-packages/llama_index/vector_stores/azureaisearch/base.py", line 768, in async_add
    raise ValueError("Async Search client not initialized")
ValueError: Async Search client not initialized
dosubot[bot] commented 2 months ago

To resolve the "Async Search client not initialized" error when running AzureAISearchVectorStore with use_async=True in version 0.10.67.post1, ensure that the search_or_index_client provided is of type AsyncSearchClient or AsyncSearchIndexClient. The error occurs because the asynchronous client is not properly initialized.

Here is a snippet to illustrate how to initialize the AzureAISearchVectorStore with an asynchronous client:

from azure.search.documents.aio import SearchClient as AsyncSearchClient
from azure.search.documents.indexes.aio import SearchIndexClient as AsyncSearchIndexClient

# Initialize the async search client
async_search_client = AsyncSearchClient(endpoint, index_name, credential)
# or
async_index_client = AsyncSearchIndexClient(endpoint, credential)

# Create the AzureAISearchVectorStore with the async client
vector_store = AzureAISearchVectorStore(
    search_or_index_client=async_search_client,  # or async_index_client
    id_field_key="id",
    chunk_field_key="chunk",
    embedding_field_key="embedding",
    metadata_string_field_key="metadata",
    doc_id_field_key="doc_id",
    use_async=True
)

Ensure that the search_or_index_client is correctly passed as an instance of AsyncSearchClient or AsyncSearchIndexClient to avoid the initialization error. Additionally, make sure that the index_name is provided if you are using AsyncSearchIndexClient, as it is required for proper initialization [1][2].

To continue talking to Dosu, mention @dosu.

logan-markewich commented 2 months ago

Seems like the azure vector store only supports passing in a sync client OR an async client.

Probably should be updated to accept both clients? But also, you can try passing in an async client instead

zhongshuai-cao commented 2 months ago

Hi @logan-markewich , thanks for your reply and I tried to use the AsyncSearchClient before creating the issue, but I didn't make it work, but I will come back with more trace...

And it was interesting it worked before, but I forgot to add llama_index version in my environments, but the code did work without importing the .aio so I guess some updates implemented better async usage but I have lost the working pip env...

zhongshuai-cao commented 2 months ago

I have tried to use the .aio modules and it works now. I wonder if there were some glitch in my environment setting as it didn't run the first time I tried.

However, I encountered this warnings for unclosed client session and I wonder how to handle that correctly:

@dosu

Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x172c65e50> Unclosed connector connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x172c10d70>, 160881.12069375)]'] connector: <aiohttp.connector.TCPConnector object at 0x172c88810> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x172c3f410> Unclosed connector connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x172c103d0>, 160881.764186583)]'] connector: <aiohttp.connector.TCPConnector object at 0x172c3fe10>

dosubot[bot] commented 2 months ago

To handle the warnings for unclosed client sessions and connectors when using the .aio modules with AsyncSearchClient, ensure that you properly close the asynchronous clients after their usage. This can be done using the async with statement to manage the lifecycle of the asynchronous clients.

Here is an example:

from azure.search.documents.aio import SearchClient as AsyncSearchClient

async def perform_search():
    async with AsyncSearchClient(endpoint, index_name, credential) as client:
        results = await client.search(search_text="example")
        # Process results

By using the async with statement, the client will be properly closed when the block is exited, ensuring that there are no unclosed client sessions or connectors [1].

To continue talking to Dosu, mention @dosu.

zhongshuai-cao commented 2 months ago

To handle the warnings for unclosed client sessions and connectors when using the .aio modules with AsyncSearchClient, ensure that you properly close the asynchronous clients after their usage. This can be done using the async with statement to manage the lifecycle of the asynchronous clients.

Here is an example:

from azure.search.documents.aio import SearchClient as AsyncSearchClient

async def perform_search():
    async with AsyncSearchClient(endpoint, index_name, credential) as client:
        results = await client.search(search_text="example")
        # Process results

By using the async with statement, the client will be properly closed when the block is exited, ensuring that there are no unclosed client sessions or connectors [1].

To continue talking to Dosu, mention @dosu.

@docu here is the code I use to run index, please advise how do I update my code to use the logic you provide?

    async_index_client = AsyncSearchIndexClient(
        endpoint=search_service_endpoint,
        credential=search_service_credential,
        api_version=search_service_api_version
    )

    vector_store = AzureAISearchVectorStore(
        search_or_index_client=async_index_client,
        index_name=index_name,
        index_management=IndexManagement.CREATE_IF_NOT_EXISTS,
        id_field_key="id",
        chunk_field_key="chunk",
        embedding_field_key="embedding",
        embedding_dimensionality=EMBED_SIZE,
        metadata_string_field_key="metadata",
        doc_id_field_key="doc_id",
        language_analyzer="en.lucene",
        vector_algorithm_type="exhaustiveKnn",
    )

for document in tqdm(documents, total=len(documents), desc="Processing files"):
    try:
        VectorStoreIndex.from_documents(
            [document],
            storage_context=storage_context,
            use_async=True
        )