qdrant / qdrant-client

Python client for Qdrant vector search engine
https://qdrant.tech
Apache License 2.0
673 stars 106 forks source link

The problem I encountered when using celery task to execute qdrant in fastApi #672

Open WatermelonSumer opened 3 days ago

WatermelonSumer commented 3 days ago

I`m using an asynchronous method to perform qdrant operations in the celery task, and set prefer_grpc to True, and the qdrant asynchronous client will be blocked. It should be caused by a problem with the event loop. My code snippet:

@db_required
async def test(file_id: str):
    # some other code...
    try:
        file_obj: UploadFile = await OSSServices.read_file(file_res, file.file_name)
        collection_name = file.kb_id

        rag = RAGInterface(collection_name)
        # insert with metadata
        metadata = {
            "kb_id": file.kb_id,
            "file_name": file.file_name,
            "file_id": file.file_id,
            "created_at": file.created_at.strftime("%Y-%m-%d %H:%M:%S"),
        }
        await rag.insert(files=file_obj, kwargs=metadata)
        # vector parser success
    except Exception as e:
        logger.exception(f"[vector_create_task]:{file.file_name}\n{str(e)}")

@shared_task
def vector_create_task(file_id: str):
    """向量解析-提交"""

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    try:
        async_result = loop.run_until_complete(test(file_id=file_id))
        return async_result
    finally:
        loop.close()

In my RAGInterface, I made some business encapsulation based on qdrant api. After my debugging, I found that every time I called AsyncQdrantClient, I would get stuck. For example, when I execute: async_client.collection_exists(collection_name), I will be stuck in the qdrant source code:

  async def collection_exists(self, collection_name: str, **kwargs: Any) -> bool:
      if self._prefer_grpc:
          return (
              await self.grpc_collections.CollectionExists(
                  grpc.CollectionExistsRequest(collection_name=collection_name),
                  timeout=self._timeout,
              )
          ).result.exists
      result: Optional[models.CollectionExistence] = (
          await self.http.collections_api.collection_exists(collection_name=collection_name)
      ).result
      assert result is not None, "Collection exists returned None"
      return result.exists

code from : qdrant_client.async_qdrant_remote.AsyncQdrantRemote

I`m very confused in this problem, anybody can help me? thanks very much!!!

joein commented 44 minutes ago

Hello @WatermelonSumer

I was not able to reproduce the issue, could you please provide a minimum reproducible example, so we could help you?