run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
33.38k stars 4.67k forks source link

[Bug]: QueryFusionRetriever doesn't work asynchronously with custom http_client #14544

Open shenhai-ran opened 4 days ago

shenhai-ran commented 4 days ago

Bug Description

I don't know if this is a bug or I use it wrong. I have a local self-signed certificate, so I have to modify the http_client in llm. Later if I use QueryFusionRetriever with use_async=True I got error as

TypeError: Invalid `http_client` argument; Expected an instance of `httpx.AsyncClient` but got <class 'httpx.Client'>

I don't understand why http_client expects an instance of httpx.AsyncClient

I use the example at Simple Fusion Retriever, and my code example is bellow.

I tried to play with http_client and async_http_client in OpenAI object, but none of them works. Only setting use_async=False works

Version

0.10.37

Steps to Reproduce

from llama_index.core import SimpleDirectoryReader
from llama_index.core import VectorStoreIndex
from llama_index.embeddings.openai import OpenAIEmbedding
import httpx
from llama_index.core.retrievers import QueryFusionRetriever
from llama_index.llms.openai import OpenAI

documents_1 = SimpleDirectoryReader(
    input_files=["<path/to/file1>"]
).load_data()
documents_2 = SimpleDirectoryReader(
    input_files=["<path/to/file2>""]
).load_data()

http_client = httpx.Client(verify="<my/certificate>")
async_http_client = httpx.AsyncClient(verify="<my/certificate>")
llm = OpenAI(
    http_client=http_client,
    # async_http_client=async_http_client,
)
embed_model = OpenAIEmbedding(
    model="text-embedding-3-small",
    dimensions=512,
    http_client=http_client,
)

index_1 = VectorStoreIndex.from_documents(
    documents_1, embed_model=embed_model, show_progress=True
)
index_2 = VectorStoreIndex.from_documents(
    documents_2, embed_model=embed_model, show_progress=True
)

retriever = QueryFusionRetriever(
    [index_1.as_retriever(), index_2.as_retriever()],
    similarity_top_k=2,
    num_queries=4,  # set this to 1 to disable query generation
    use_async=True,
    verbose=True,
    llm=llm,
    # query_gen_prompt="...",  # we could override the query generation prompt here
)

nodes_with_scores = retriever.retrieve("How do I setup a chroma vector store?")

for node in nodes_with_scores:
    print(f"Score: {node.score:.2f} - {node.text[:100]}...")

Error message as follow:

nodes_with_scores = retriever.retrieve("How do I setup a chroma vector store?")
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 274, in wrapper
    result = func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\base\base_retriever.py", line 244, in retrieve
    nodes = self._retrieve(query_bundle)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\retrievers\fusion_retriever.py", line 261, in _retrieve       
    results = self._run_nested_async_queries(queries)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\retrievers\fusion_retriever.py", line 220, in _run_nested_async_queries
    task_results = run_async_tasks(tasks)
                   ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\async_utils.py", line 66, in run_async_tasks
    outputs: List[Any] = asyncio_run(_gather())
                         ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\async_utils.py", line 30, in asyncio_run
    return loop.run_until_complete(coro)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\asyncio\base_events.py", line 654, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\async_utils.py", line 64, in _gather
    return await asyncio.gather(*tasks_to_execute)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 307, in async_wrapper    
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\base\base_retriever.py", line 276, in aretrieve
    nodes = await self._aretrieve(query_bundle=query_bundle)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 307, in async_wrapper    
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 109, in _aretrieve
    embedding = await embed_model.aget_agg_embedding_from_queries(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\base\embeddings\base.py", line 197, in aget_agg_embedding_from_queries
    query_embeddings = [await self.aget_query_embedding(query) for query in queries]
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\base\embeddings\base.py", line 197, in <listcomp>
    query_embeddings = [await self.aget_query_embedding(query) for query in queries]
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 307, in async_wrapper    
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\base\embeddings\base.py", line 165, in aget_query_embedding   
    query_embedding = await self._aget_query_embedding(query)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\embeddings\openai\base.py", line 393, in _aget_query_embedding     
    aclient = self._get_aclient()
              ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\embeddings\openai\base.py", line 364, in _get_aclient
    self._aclient = AsyncOpenAI(**self._get_credential_kwargs())
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\openai\_client.py", line 334, in __init__
    super().__init__(
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\openai\_base_client.py", line 1383, in __init__
    raise TypeError(
TypeError: Invalid `http_client` argument; Expected an instance of `httpx.AsyncClient` but got <class 'httpx.Client'>

Relevant Logs/Tracbacks

No response

dosubot[bot] commented 4 days ago

The QueryFusionRetriever expects an instance of httpx.AsyncClient when using use_async=True because asynchronous operations require an asynchronous HTTP client to handle non-blocking requests. This ensures that the asynchronous nature of the operations is maintained, allowing for concurrent execution without blocking the main thread.

To resolve the TypeError when using a custom http_client with a self-signed certificate, you need to ensure that the custom http_client is an instance of httpx.AsyncClient if use_async=True. Additionally, you should configure the httpx.AsyncClient to trust the self-signed certificate.

Here is an example of how you can create a custom httpx.AsyncClient with a self-signed certificate:

import httpx
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI

# Create a custom AsyncClient with a self-signed certificate
custom_async_http_client = httpx.AsyncClient(verify="<my/certificate>")

# Pass the custom AsyncClient to OpenAI and OpenAIEmbedding
llm = OpenAI(
    async_http_client=custom_async_http_client,
)

embed_model = OpenAIEmbedding(
    model="text-embedding-3-small",
    dimensions=512,
    http_client=custom_async_http_client,
)

# Use the rest of your code as is
documents_1 = SimpleDirectoryReader(
    input_files=["<path/to/file1>"]
).load_data()
documents_2 = SimpleDirectoryReader(
    input_files=["<path/to/file2>"]
).load_data()

index_1 = VectorStoreIndex.from_documents(
    documents_1, embed_model=embed_model, show_progress=True
)
index_2 = VectorStoreIndex.from_documents(
    documents_2, embed_model=embed_model, show_progress=True
)

retriever = QueryFusionRetriever(
    [index_1.as_retriever(), index_2.as_retriever()],
    similarity_top_k=2,
    num_queries=4,  # set this to 1 to disable query generation
    use_async=True,
    verbose=True,
    llm=llm,
    # query_gen_prompt="...",  # we could override the query generation prompt here
)

nodes_with_scores = await retriever.retrieve("How do I setup a chroma vector store?")

for node in nodes_with_scores:
    print(f"Score: {node.score:.2f} - {node.text[:100]}...")

In this example, verify="<my/certificate>" is used to specify the path to your self-signed certificate. Make sure to use this approach only in a secure and controlled environment, as disabling SSL verification can expose you to security risks [1].

To continue talking to Dosu, mention @dosu.

logan-markewich commented 4 days ago

You need to pass the sync and async custom client to both the llm and embeddings

You'll see in your traceback it's pointing towards embeddings. (Although looking at it now, the OpenAIEmbedding class doesn't take an async http client yet, so that would have to be added)

shenhai-ran commented 4 days ago

@logan-markewich Thanks for your feedback, I have tired to include the sync and async custom client to both the llm and embeddings as you suggested, but I still get the same error...

If you don't use self signed certificate, can you try in the client with verify=False? Do you get result without similar errors?

Thanks