run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
33.3k stars 4.66k forks source link

[Bug]: Not able to create asynchronous chromadb. #14515

Open rvssridatta opened 2 days ago

rvssridatta commented 2 days ago

Bug Description

I like to create a status_checker api endpoint in fastapi to track the creation of chromadb embeddings. Also I like to create these emebeddings in async mode. Below mentioned the code, but it is giving error. Please do the needful.

Version

llama-index 0.10.12

Steps to Reproduce

import chromadb from llama_index.vector_stores.chroma import ChromaVectorStore from llama_index.core import VectorStoreIndex, StorageContext from llama_index.core import SimpleDirectoryReader

@app.post("/upload") async def upload_file(file: UploadFile = File(...)):

try:

    # Ensure 'docs' directory exists
    if not os.path.exists("docs"):
        os.makedirs("docs")

    # Write the file to server with its original filename
    file_path = os.path.join("docs", file.filename)
    with open(file_path, "wb") as f:
        f.write(await file.read())
    from rag_define import define_rag
    asyncio.create_task(define_rag())

    return JSONResponse(content={"message": "File uploaded successfully"})

except Exception as e:
    return JSONResponse(content={"error": str(e)}, status_code=500)

@app.post("/status") async def status_checker(): return global_variable.upload_in_progress

async def define_rag(): documents = SimpleDirectoryReader(input_dir="./docs",required_exts = [".docx",".doc",".pdf",".txt"]).load_data() if os.path.exists('./chroma_db'): print("*****utilizing pre generated embeddings from chromadb folder") chroma_client = chromadb.PersistentClient(path="./chroma_db") chroma_collection = chroma_client.get_or_create_collection("quickstart") vector_store = ChromaVectorStore(chroma_collection=chroma_collection) vector_index_chunk = VectorStoreIndex.from_vector_store(vector_store, embed_model=global_variable.embed_model) else: chroma_client = chromadb.PersistentClient(path="./chroma_db") chroma_collection = chroma_client.get_or_create_collection("quickstart") vector_store = ChromaVectorStore(chroma_collection=chroma_collection) storage_context = StorageContext.from_defaults(vector_store=vector_store)

index = VectorStoreIndex.from_documents(documents,storage_context=storage_context)

  vector_index_chunk = await VectorStoreIndex(all_nodes, embed_model=global_variable.embed_model, 

storage_context=storage_context)#, use_async=True, show_progress=True)

vector_retriever_chunk = vector_index_chunk.as_retriever(similarity_top_k=5) global_variable.retriever_chunk = RecursiveRetriever( "vector", retriever_dict={"vector": vector_retriever_chunk}, node_dict=all_nodes_dict, verbose=True, ) print("Vector store creation done") global_variable.upload_in_progress = 1 global_variable.query_engine_chunk = RetrieverQueryEngine.from_args(global_variable.retriever_chunk, llm=global_variable.llm, text_qa_template=global_variable.text_qa_template)

Relevant Logs/Tracbacks

vector_index_chunk = await VectorStoreIndex(all_nodes, embed_model=global_variable.embed_model, storage_context=storage_context, use_async=True, show_progress=True)
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/lib/python3.11/site-packages/llama_index/core/indices/vector_store/base.py", line 74, in __init__
    super().__init__(
  File "/home/lib/python3.11/site-packages/llama_index/core/indices/base.py", line 91, in __init__
    index_struct = self.build_index_from_nodes(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/lib/python3.11/site-packages/llama_index/core/indices/vector_store/base.py", line 307, in build_index_from_nodes
    return self._build_index_from_nodes(nodes, **insert_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/lib/python3.11/site-packages/llama_index/core/indices/vector_store/base.py", line 277, in _build_index_from_nodes
    run_async_tasks(tasks)
  File "/homelib/python3.11/site-packages/llama_index/core/async_utils.py", line 49, in run_async_tasks
    outputs: List[Any] = asyncio.run(_gather())
                         ^^^^^^^^^^^^^^^^^^^^^^
  File "/homelib/python3.11/asyncio/runners.py", line 186, in run
    raise RuntimeError(
RuntimeError: asyncio.run() cannot be called from a running event loop
/home/miniconda3/envs/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py:-1: RuntimeWarning: coroutine 'run_async_tasks.<locals>._gather' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
/home/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py:-1: RuntimeWarning: coroutine 'VectorStoreIndex._async_add_nodes_to_index' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
dosubot[bot] commented 2 days ago

To resolve the issue of calling asyncio.run() from a running event loop and properly create asynchronous ChromaDB embeddings in your FastAPI application, you should avoid using asyncio.run() within an already running event loop. Instead, you can use await directly or use asyncio.create_task() to schedule the coroutine.

Here is an example of how you can modify your code to handle asynchronous embeddings properly:

import os
import asyncio
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
import chromadb
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import VectorStoreIndex, StorageContext, SimpleDirectoryReader
from llama_index.core.retrievers import RecursiveRetriever
from llama_index.core.query_engine import RetrieverQueryEngine

app = FastAPI()

# Global variable to track upload status
class GlobalVariable:
    upload_in_progress = 0
    retriever_chunk = None
    query_engine_chunk = None
    embed_model = None  # Initialize with your embedding model
    llm = None  # Initialize with your LLM
    text_qa_template = None  # Initialize with your text QA template

global_variable = GlobalVariable()

@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
    try:
        # Ensure 'docs' directory exists
        if not os.path.exists("docs"):
            os.makedirs("docs")

        # Write the file to server with its original filename
        file_path = os.path.join("docs", file.filename)
        with open(file_path, "wb") as f:
            f.write(await file.read())

        # Start the asynchronous task to define RAG
        asyncio.create_task(define_rag())

        return JSONResponse(content={"message": "File uploaded successfully"})

    except Exception as e:
        return JSONResponse(content={"error": str(e)}, status_code=500)

@app.post("/status")
async def status_checker():
    return {"upload_in_progress": global_variable.upload_in_progress}

async def define_rag():
    documents = SimpleDirectoryReader(input_dir="./docs", required_exts=[".docx", ".doc", ".pdf", ".txt"]).load_data()
    if os.path.exists('./chroma_db'):
        print("*************************utilizing pre generated embeddings from chromadb folder")
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        chroma_collection = chroma_client.get_or_create_collection("quickstart")
        vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
        vector_index_chunk = VectorStoreIndex.from_vector_store(vector_store, embed_model=global_variable.embed_model)
    else:
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        chroma_collection = chroma_client.get_or_create_collection("quickstart")
        vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
        storage_context = StorageContext.from_defaults(vector_store=vector_store)
        vector_index_chunk = await VectorStoreIndex.from_documents(documents, embed_model=global_variable.embed_model, storage_context=storage_context, use_async=True, show_progress=True)

    vector_retriever_chunk = vector_index_chunk.as_retriever(similarity_top_k=5)
    global_variable.retriever_chunk = RecursiveRetriever(
        "vector",
        retriever_dict={"vector": vector_retriever_chunk},
        node_dict={node.node_id: node for node in documents},
        verbose=True,
    )
    print("Vector store creation done")
    global_variable.upload_in_progress = 1
    global_variable.query_engine_chunk = RetrieverQueryEngine.from_args(global_variable.retriever_chunk, llm=global_variable.llm, text_qa_template=global_variable.text_qa_template)

In this example, the define_rag function is properly defined as an asynchronous function, and the VectorStoreIndex is created using the await keyword to handle asynchronous operations. This approach ensures that the embeddings are created asynchronously and efficiently without causing runtime errors related to the event loop [1][2].

To continue talking to Dosu, mention @dosu.

rvssridatta commented 1 day ago

Followed same code suggested by @dosu and executed still getting the same error. Please do the needful

dosubot[bot] commented 1 day ago

The error persists because asyncio.run() is being called within an already running event loop. To fix this, ensure that all asynchronous operations are properly awaited within the running event loop.

Here's the corrected define_rag function:

async def define_rag():
    documents = SimpleDirectoryReader(input_dir="./docs", required_exts=[".docx", ".doc", ".pdf", ".txt"]).load_data()
    if os.path.exists('./chroma_db'):
        print("*************************utilizing pre generated embeddings from chromadb folder")
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        chroma_collection = chroma_client.get_or_create_collection("quickstart")
        vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
        vector_index_chunk = VectorStoreIndex.from_vector_store(vector_store, embed_model=global_variable.embed_model)
    else:
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        chroma_collection = chroma_client.get_or_create_collection("quickstart")
        vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
        storage_context = StorageContext.from_defaults(vector_store=vector_store)
        vector_index_chunk = await VectorStoreIndex.from_documents(documents, embed_model=global_variable.embed_model, storage_context=storage_context, use_async=True, show_progress=True)

    vector_retriever_chunk = vector_index_chunk.as_retriever(similarity_top_k=5)
    global_variable.retriever_chunk = RecursiveRetriever(
        "vector",
        retriever_dict={"vector": vector_retriever_chunk},
        node_dict={node.node_id: node for node in documents},
        verbose=True,
    )
    print("Vector store creation done")
    global_variable.upload_in_progress = 1
    global_variable.query_engine_chunk = RetrieverQueryEngine.from_args(global_variable.retriever_chunk, llm=global_variable.llm, text_qa_template=global_variable.text_qa_template)

This modification ensures that the asynchronous tasks are awaited properly within the running event loop, avoiding the RuntimeError you encountered [1].

To continue talking to Dosu, mention @dosu.