Open upchunk opened 1 month ago
Hey @upchunk, I'm here to help you with any bugs, questions, or becoming a contributor. Let's tackle this together!
To resolve the RuntimeError: no running event loop
error when running an ingestion pipeline on FastAPI using the llamaindex
library's asyncio_run
function, you need to ensure that an event loop is running. Here is an example of how you can modify your code to ensure an event loop is running:
import asyncio
from llama_index.core.ingestion.pipeline import IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.embeddings.openai import OpenAIEmbedding
async def run_pipeline(documents):
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=512, chunk_overlap=20),
OpenAIEmbedding(),
],
)
nodes = await pipeline.arun(documents=documents)
return nodes
# Ensure an event loop is running
documents = [...] # Your documents here
asyncio.run(run_pipeline(documents))
Alternatively, you can use the asyncio_run
utility function provided by the llamaindex
library, which handles the event loop more effectively:
from llama_index.core.async_utils import asyncio_run
async def run_pipeline(documents):
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=512, chunk_overlap=20),
OpenAIEmbedding(),
],
)
nodes = await pipeline.arun(documents=documents)
return nodes
# Ensure an event loop is running using llamaindex's asyncio_run
documents = [...] # Your documents here
asyncio_run(run_pipeline(documents))
These approaches should help manage the event loop and resolve the RuntimeError: no running event loop
error [1][2][3].
To continue talking to Dosu, mention @dosu.
I have done it with and without the asyncio_run, and the problem still occurs on both methods. I think the problem is in the asyncio_run helper itself
If the problem persists even with the asyncio_run
helper, it might be due to how the event loop is being managed in your FastAPI application. Here are a few steps to ensure proper event loop management and avoid the RuntimeError: no running event loop
error:
Ensure the Event Loop is Running: Make sure that the event loop is properly managed within FastAPI. FastAPI should handle the event loop for you, but if you're running tasks manually, ensure they are correctly awaited.
Use nest_asyncio
:
If you're encountering nested event loops, especially in environments like Jupyter notebooks, you can use nest_asyncio
to allow nested async operations.
import nest_asyncio
nest_asyncio.apply()
Check for Event Loop Closure: Ensure that the event loop is not being closed prematurely. This can happen if there are unhandled exceptions or if the loop is explicitly closed.
Example Integration with FastAPI:
Here is an example of how to integrate the IngestionPipeline
with FastAPI, ensuring the event loop is managed correctly:
from fastapi import FastAPI, HTTPException
import asyncio
from llama_index.core.ingestion.pipeline import IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.embeddings.openai import OpenAIEmbedding
from llama_index.core.async_utils import asyncio_run
app = FastAPI()
async def run_pipeline(documents):
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=512, chunk_overlap=20),
OpenAIEmbedding(),
],
)
nodes = await pipeline.arun(documents=documents)
return nodes
@app.post("/ingest")
async def ingest_documents(documents: list):
try:
nodes = await run_pipeline(documents)
return {"status": "success", "nodes": nodes}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import nest_asyncio
nest_asyncio.apply()
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
This setup ensures that the event loop is properly managed and that nest_asyncio
is applied to handle nested async operations. If the issue persists, it might be worth checking for any other parts of your code that might be closing the event loop or causing it to be in an invalid state.
@logan-markewich, this one's a bit out of my wheelhouse, could you step in and provide your guidance?
To continue talking to Dosu, mention @dosu.
@upchunk weird, this code is in a try/except, how was it not caught? 🙃
In any case, if you are using fastapi, you should be using async entry points to the pipeline if you aren't already
pipeline.arun(documents=documents)
I am not able to replicate actually
from fastapi import FastAPI
from llama_index.core import Document
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.extractors import KeywordExtractor
text = Document.example().text * 1000
app = FastAPI()
@app.get("/")
async def hello():
return {"Hello": "World"}
@app.get("/test")
async def run_test():
pipeline = IngestionPipeline(transformations=[
SentenceSplitter(),
KeywordExtractor(),
OpenAIEmbedding(),
]
)
await pipeline.arun(documents=[Document(text=text), Document(text=text)])
return "Complete"
if __name__ == "__main__":
import uvicorn
uvicorn.run(app)
I do the request to the endpoint using aiohttp with 30.000 data sourced from pdf and stored on Cloud MongoDB Server, each data contains texts with the length of 1000 - 2000 words++ , limited by asynio.Semaphore(10) to prevent LLM Rate limit.
This takes 20 Hours total without Async and more if using Async.
Bug Description
Getting this
RuntimeError: no running event loop
error when running ingestion pipeline on FastAPI, the error are coming from llamaindex asyncio_run function (from llama_index.core.async_utils import asyncio_run
)Version
0.10.41
Steps to Reproduce
Relevant Logs/Tracbacks