run-llama / llama_parse

Parse files for optimal RAG
https://www.llamaindex.ai
MIT License
2.32k stars 239 forks source link

RateLimitError - OPENAI while using get_nodes_from_documents #130

Open OmarHory opened 4 months ago

OmarHory commented 4 months ago

Hello,

I am getting the following while using this:

node_parser = MarkdownElementNodeParser(llm=OpenAI(model=MODEL_NAME), num_workers=1)
nodes = node_parser.get_nodes_from_documents(documents)

848it [00:00, 59827.92it/s] 0%| | 0/848 [00:00<?, ?it/s]Retrying llama_index.llms.openai.base.OpenAI._achat in 0.5442886034978512 seconds as it raised RateLimitError: Error code: 429 - {'error': {'message': 'Rate limit reached for gpt-3.5-turbo in organization org-XXXXXXXX on tokens per min (TPM): Limit 60000, Used 59836, Requested 657. Please try again in 493ms. Visit https://platform.openai.com/account/rate-limits to learn more.', 'type': 'tokens', 'param': None, 'code': 'rate_limit_exceeded'}}.

Thank you!!

TitasDas commented 4 months ago

Try an exponential backoff strategy and catch those errors. Basically catch those errors and keep retrying using a backoff strategy. You can refer the open ai docs to see strategies defined for handling rate limit errors.

OmarHory commented 4 months ago

Try an exponential backoff strategy and catch those errors. Basically catch those errors and keep retrying using a backoff strategy. You can refer the open ai docs to see strategies defined for handling rate limit errors.

I am not sure if thats possible to do given the function I referenced.

TitasDas commented 4 months ago

Can you share your entire code or a part in a way such that I am able to reproduce this problem at my end?

Alternatively there's also the use of semaphores, just sharing some placeholder code to get the idea across:

from threading import Semaphore, Thread

# Initialize your node_parser 
node_parser = MarkdownElementNodeParser(llm=OpenAI(model=MODEL_NAME), num_workers=1)
documents = [...]  # Your documents to parse

# Initialize a semaphore with the desired number of concurrent accesses
semaphore = Semaphore(1)  # num_workers=1 means only one at a time

# Define a function to use semaphore around the get_nodes_from_document call
def parse_document_with_semaphore(document):
    semaphore.acquire()  # Wait for access if necessary and lock semaphore
    try:
        nodes = node_parser.get_nodes_from_document(document)  
    finally:
        semaphore.release()  # Always release the semaphore

# Create threads for each document to parse
threads = [Thread(target=parse_document_with_semaphore, args=(doc,)) for doc in documents]

# Start threads
for thread in threads:
    thread.start()

# Wait for all threads to complete
for thread in threads:
    thread.join()

# At this point, all documents have been processed with controlled concurrency 
OmarHory commented 4 months ago

Can you share your entire code or a part in a way such that I am able to reproduce this problem at my end?

Alternatively there's also the use of semaphores, just sharing some placeholder code to get the idea across:

from threading import Semaphore, Thread

# Initialize your node_parser 
node_parser = MarkdownElementNodeParser(llm=OpenAI(model=MODEL_NAME), num_workers=1)
documents = [...]  # Your documents to parse

# Initialize a semaphore with the desired number of concurrent accesses
semaphore = Semaphore(1)  # num_workers=1 means only one at a time

# Define a function to use semaphore around the get_nodes_from_document call
def parse_document_with_semaphore(document):
    semaphore.acquire()  # Wait for access if necessary and lock semaphore
    try:
        nodes = node_parser.get_nodes_from_document(document)  
    finally:
        semaphore.release()  # Always release the semaphore

# Create threads for each document to parse
threads = [Thread(target=parse_document_with_semaphore, args=(doc,)) for doc in documents]

# Start threads
for thread in threads:
    thread.start()

# Wait for all threads to complete
for thread in threads:
    thread.join()

# At this point, all documents have been processed with controlled concurrency 

Hi, thank you for your patience.

I am doing this async behaviour on parsing the data from llamaCloud (Llama Parse)

import asyncio

async def parse_document(parser, document_path):
    doc = await parser.aload_data(document_path)
    return doc[0]

async def process_documents(document_paths):
    parser = LlamaParse(result_type=PARSE_TYPE)
    tasks = [parse_document(parser, doc_path) for doc_path in document_paths]
    documents = await asyncio.gather(*tasks)
    return documents

document_paths = ["books_cutted/50_Fiscal.pdf", "books_cutted/50_Social.pdf"]
documents = asyncio.run(process_documents(document_paths))

This is the logic that just parses the parsed PDF (in markdown) in which creates those nodes and objects out of every document. In my case, I have one PDF that is 700 pages (could be more even)

node_parser = MarkdownElementNodeParser(llm=OpenAI(model=MODEL_NAME), num_workers=8)
nodes = node_parser.get_nodes_from_documents(documents)

So in my case, doing threading is a good choice if you have like a lot of documents in hand, I have a single large document, not sure how to deal with this. Do you support chunking those documents into 20 pages each or something? Or that will affect the inner connections with how the Nodes work.