run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
35.37k stars 4.98k forks source link

[Bug]: Can't use qdrant db in QueryFusionRetriever #15621

Closed harshil1973 closed 1 week ago

harshil1973 commented 2 weeks ago

Bug Description

This is my code,

from llama_index.core import VectorStoreIndex, StorageContext
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.vector_stores.qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
# import nest_asyncio

# nest_asyncio.apply()

qdrant_client = QdrantClient(
    path = ""
)
docstore = SimpleDocumentStore()
docstore.add_documents(nodes)

vector_store = QdrantVectorStore(client=qdrant_client, collection_name="_v1")

storage_context = StorageContext.from_defaults(
    docstore=docstore, vector_store=vector_store
)

index = VectorStoreIndex(nodes=nodes, storage_context=storage_context)

from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.core.retrievers import QueryFusionRetriever

dense_retriever = VectorIndexRetriever(
    index=index,
    similarity_top_k=5
)

sparse_retriever = BM25Retriever.from_defaults(docstore=index.docstore, similarity_top_k=5)

retriever = QueryFusionRetriever(
    [
        dense_retriever,
        sparse_retriever,
    ],
    num_queries=1,
    use_async=True,
    retriever_weights=[0.5, 0.5],
    similarity_top_k=5,
    mode="relative_score",
    verbose=True,
)
import nest_asyncio
nest_asyncio.apply()
from llama_index.core.postprocessor import SentenceTransformerRerank
from llama_index.core.response.notebook_utils import display_source_node

reranker = SentenceTransformerRerank(model="cross-encoder/ms-marco-MiniLM-L-12-v2", top_n=5)

nodes = retriever.retrieve("bail")
nodes = reranker.postprocess_nodes(nodes)
for node in nodes:
    print(node.metadata['file_name'])
    #print("---")
    display_source_node(node, source_length=5000)

Note that dense_retriever is working great without QueryFusionRetriever. And this whole code is working for chromadb also.

Version

0.11.1

Steps to Reproduce

I have provided code for that

Relevant Logs/Tracbacks

AttributeError                            Traceback (most recent call last)
Cell In[18], line 8
      4 from llama_index.core.response.notebook_utils import display_source_node
      6 reranker = SentenceTransformerRerank(model="cross-encoder/ms-marco-MiniLM-L-12-v2", top_n=5)
----> 8 nodes = retriever.retrieve("bail")
      9 nodes = reranker.postprocess_nodes(nodes)
     10 for node in nodes:

File /opt/conda/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py:261, in Dispatcher.span.<locals>.wrapper(func, instance, args, kwargs)
    253 self.span_enter(
    254     id_=id_,
    255     bound_args=bound_args,
   (...)
    258     tags=tags,
    259 )
    260 try:
--> 261     result = func(*args, **kwargs)
    262 except BaseException as e:
    263     self.event(SpanDropEvent(span_id=id_, err_str=str(e)))

File /opt/conda/lib/python3.10/site-packages/llama_index/core/base/base_retriever.py:245, in BaseRetriever.retrieve(self, str_or_query_bundle)
    240 with self.callback_manager.as_trace("query"):
    241     with self.callback_manager.event(
    242         CBEventType.RETRIEVE,
    243         payload={EventPayload.QUERY_STR: query_bundle.query_str},
    244     ) as retrieve_event:
--> 245         nodes = self._retrieve(query_bundle)
    246         nodes = self._handle_recursive_retrieval(query_bundle, nodes)
    247         retrieve_event.on_end(
    248             payload={EventPayload.NODES: nodes},
    249         )

File /opt/conda/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py:261, in Dispatcher.span.<locals>.wrapper(func, instance, args, kwargs)
    253 self.span_enter(
    254     id_=id_,
    255     bound_args=bound_args,
   (...)
    258     tags=tags,
    259 )
    260 try:
--> 261     result = func(*args, **kwargs)
    262 except BaseException as e:
    263     self.event(SpanDropEvent(span_id=id_, err_str=str(e)))

File /opt/conda/lib/python3.10/site-packages/llama_index/core/retrievers/fusion_retriever.py:261, in QueryFusionRetriever._retrieve(self, query_bundle)
    258     queries.extend(self._get_queries(query_bundle.query_str))
    260 if self.use_async:
--> 261     results = self._run_nested_async_queries(queries)
    262 else:
    263     results = self._run_sync_queries(queries)

File /opt/conda/lib/python3.10/site-packages/llama_index/core/retrievers/fusion_retriever.py:220, in QueryFusionRetriever._run_nested_async_queries(self, queries)
    217         tasks.append(retriever.aretrieve(query))
    218         task_queries.append((query.query_str, i))
--> 220 task_results = run_async_tasks(tasks)
    222 results = {}
    223 for query_tuple, query_result in zip(task_queries, task_results):

File /opt/conda/lib/python3.10/site-packages/llama_index/core/async_utils.py:77, in run_async_tasks(tasks, show_progress, progress_bar_desc)
     74 async def _gather() -> List[Any]:
     75     return await asyncio.gather(*tasks_to_execute)
---> 77 outputs: List[Any] = asyncio_run(_gather())
     78 return outputs

File /opt/conda/lib/python3.10/site-packages/llama_index/core/async_utils.py:33, in asyncio_run(coro)
     30     loop = asyncio.get_event_loop()
     32     # If we're here, there's an existing loop but it's not running
---> 33     return loop.run_until_complete(coro)
     35 except RuntimeError as e:
     36     # If we can't get the event loop, we're likely in a different thread, or its already running
     37     try:

File /opt/conda/lib/python3.10/site-packages/nest_asyncio.py:98, in _patch_loop.<locals>.run_until_complete(self, future)
     95 if not f.done():
     96     raise RuntimeError(
     97         'Event loop stopped before Future completed.')
---> 98 return f.result()

File /opt/conda/lib/python3.10/asyncio/futures.py:201, in Future.result(self)
    199 self.__log_traceback = False
    200 if self._exception is not None:
--> 201     raise self._exception.with_traceback(self._exception_tb)
    202 return self._result

File /opt/conda/lib/python3.10/asyncio/tasks.py:234, in Task.__step(***failed resolving arguments***)
    232         result = coro.send(None)
    233     else:
--> 234         result = coro.throw(exc)
    235 except StopIteration as exc:
    236     if self._must_cancel:
    237         # Task is cancelled right before coro stops.

File /opt/conda/lib/python3.10/site-packages/llama_index/core/async_utils.py:75, in run_async_tasks.<locals>._gather()
     74 async def _gather() -> List[Any]:
---> 75     return await asyncio.gather(*tasks_to_execute)

File /opt/conda/lib/python3.10/asyncio/tasks.py:304, in Task.__wakeup(self, future)
    302 def __wakeup(self, future):
    303     try:
--> 304         future.result()
    305     except BaseException as exc:
    306         # This may also be a cancellation.
    307         self.__step(exc)

File /opt/conda/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
    228 try:
    229     if exc is None:
    230         # We use the `send` method directly, because coroutines
    231         # don't have `__iter__` and `__next__` methods.
--> 232         result = coro.send(None)
    233     else:
    234         result = coro.throw(exc)

File /opt/conda/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py:291, in Dispatcher.span.<locals>.async_wrapper(func, instance, args, kwargs)
    283 self.span_enter(
    284     id_=id_,
    285     bound_args=bound_args,
   (...)
    288     tags=tags,
    289 )
    290 try:
--> 291     result = await func(*args, **kwargs)
    292 except BaseException as e:
    293     self.event(SpanDropEvent(span_id=id_, err_str=str(e)))

File /opt/conda/lib/python3.10/site-packages/llama_index/core/base/base_retriever.py:276, in BaseRetriever.aretrieve(self, str_or_query_bundle)
    271 with self.callback_manager.as_trace("query"):
    272     with self.callback_manager.event(
    273         CBEventType.RETRIEVE,
    274         payload={EventPayload.QUERY_STR: query_bundle.query_str},
    275     ) as retrieve_event:
--> 276         nodes = await self._aretrieve(query_bundle=query_bundle)
    277         nodes = await self._ahandle_recursive_retrieval(
    278             query_bundle=query_bundle, nodes=nodes
    279         )
    280         retrieve_event.on_end(
    281             payload={EventPayload.NODES: nodes},
    282         )

File /opt/conda/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py:291, in Dispatcher.span.<locals>.async_wrapper(func, instance, args, kwargs)
    283 self.span_enter(
    284     id_=id_,
    285     bound_args=bound_args,
   (...)
    288     tags=tags,
    289 )
    290 try:
--> 291     result = await func(*args, **kwargs)
    292 except BaseException as e:
    293     self.event(SpanDropEvent(span_id=id_, err_str=str(e)))

File /opt/conda/lib/python3.10/site-packages/llama_index/core/indices/vector_store/retrievers/retriever.py:112, in VectorIndexRetriever._aretrieve(self, query_bundle)
    108         embed_model = self._embed_model
    109         embedding = await embed_model.aget_agg_embedding_from_queries(
    110             query_bundle.embedding_strs
    111         )
--> 112 return await self._aget_nodes_with_embeddings(
    113     QueryBundle(query_str=query_bundle.query_str, embedding=embedding)
    114 )

File /opt/conda/lib/python3.10/site-packages/llama_index/core/indices/vector_store/retrievers/retriever.py:184, in VectorIndexRetriever._aget_nodes_with_embeddings(self, query_bundle_with_embeddings)
    180 async def _aget_nodes_with_embeddings(
    181     self, query_bundle_with_embeddings: QueryBundle
    182 ) -> List[NodeWithScore]:
    183     query = self._build_vector_store_query(query_bundle_with_embeddings)
--> 184     query_result = await self._vector_store.aquery(query, **self._kwargs)
    185     return self._build_node_list_from_query_result(query_result)

File /opt/conda/lib/python3.10/site-packages/llama_index/vector_stores/qdrant/base.py:968, in QdrantVectorStore.aquery(self, query, **kwargs)
    966     return self.parse_to_query_result(response[0])
    967 else:
--> 968     response = await self._aclient.search(
    969         collection_name=self.collection_name,
    970         query_vector=query_embedding,
    971         limit=query.similarity_top_k,
    972         query_filter=query_filter,
    973     )
    975     return self.parse_to_query_result(response)

AttributeError: 'NoneType' object has no attribute 'search'
dosubot[bot] commented 2 weeks ago

Hey @harshil1973! I'm here to help you with any issues or questions you have while waiting for a human maintainer. Let's tackle this bug together!

I found a similar closed issue that might help you resolve the AttributeError: 'NoneType' object has no attribute 'search' error when using QueryFusionRetriever with Qdrant DB. The error suggests that the _aclient attribute in your QdrantVectorStore class is None, indicating that the asynchronous client was not properly initialized.

To address this issue, ensure that the QdrantVectorStore instance is correctly initialized with either a valid client or aclient instance, or with the necessary parameters (url, api_key, and collection_name) so the class can initialize its own client instances correctly.

Here is an example of how to properly initialize the QdrantVectorStore:

from qdrant_client import QdrantClient, AsyncQdrantClient
from llama_index.vector_stores.qdrant import QdrantVectorStore

qdrant_vector_store = QdrantVectorStore(
    collection_name="your_collection_name",
    client=QdrantClient(url="your_qdrant_url", api_key="your_api_key"),
    aclient=AsyncQdrantClient(url="your_qdrant_url", api_key="your_api_key")
)

Ensure that the QdrantClient and AsyncQdrantClient are properly initialized with the correct URL and API key. If you are using the :memory: mode, note that the data between clients is not synced.

Additionally, you might want to add some debug statements to ensure that the _aclient is being initialized correctly:

if self._aclient is None:
    raise ValueError("Failed to initialize AsyncQdrantClient")

This will help you identify if the client is being set to None and why.

For more details, you can refer to the relevant parts of the QdrantVectorStore class initialization [1].

If these steps do not resolve the issue, please provide more information about how you are initializing the QdrantVectorStore in your code.

To continue talking to Dosu, mention @dosu.

logan-markewich commented 2 weeks ago

Pass in the async client as well

QdrantVectorStore(..., client=QdrantClient(..), aclient=AsyncQdrantClient(...))

harshil1973 commented 2 weeks ago

@logan-markewich Is there any way I can use local vector database in both the clients like this cause it is giving error like Storage folder /kaggle/working/ is already accessed by another instance of Qdrant client. If you require concurrent access, use Qdrant server instead.

qdrant_client = QdrantClient(
    path = "/kaggle/working/"
)
qdrant_aclient = AsyncQdrantClient(
    "/kaggle/working/"
)
logan-markewich commented 2 weeks ago

Not sure. Seems to be a limitation of qdrant (i always use the docker server image for qdrant and it works well)

Workaround would be using another vector db, using the docker image for the server, setting use_async=False in the retriever, or writing your own retriver that skips async vector store operations (would be fun to write this in a workflow actually)

harshil1973 commented 1 week ago

I am closing this issue for now