run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
35.4k stars 4.99k forks source link

[Question]: How can a VectorStoreIndex be generated via parallel processing? #15655

Open compilebunny opened 2 weeks ago

compilebunny commented 2 weeks ago

Question Validation

Question

I am working on a RAG project that involves ingesting large numbers of PDF documents. SimpleDirectoryReader is slow but acceptable, however digestion using Document and VectorStoreIndex.from_documents is unacceptably slow running on one CPU.

Is there a way to get these processes running across multiple CPUs or the GPU?

Sample code below

  documents = SimpleDirectoryReader(
    input_dir = document_dir,
    recursive = True,
  ).load_data()

  documents = Document(text = "\n\n".join([doc.text for doc in documents]))

  index = VectorStoreIndex.from_documents(
    [documents], service_context=saved_context
  )
dosubot[bot] commented 2 weeks ago

Hey @compilebunny! I'm here to help you out with any bugs, questions, or contributions you have. Let's tackle this issue together while we wait for the human maintainers.

I found a similar issue that addresses the slow creation and reading times of the VectorStoreIndex with large datasets. Here are some suggested optimizations:

  1. Parallel Processing: Utilize parallel processing to expedite the index creation. This involves dividing your data into smaller chunks and processing them simultaneously across multiple CPU cores.

  2. Incremental Indexing: Build the index incrementally by adding data in smaller batches, which can be more efficient than processing everything at once.

  3. Optimize Reading: Improve read times by using techniques like memory mapping (mmap) to map the index file into memory, which can significantly reduce loading times.

  4. Efficient Data Structures: Employ data structures that support fast search and insertion operations to enhance both creation and reading speeds.

  5. Hardware Acceleration: If available, leverage GPUs for hardware acceleration to further speed up processing.

Here's a practical example of implementing parallel processing and incremental indexing:

from multiprocessing import Pool
from llama_index.core import VectorStoreIndex

def create_index(nodes):
    return VectorStoreIndex(nodes, include_embeddings=True)

# Split nodes into manageable chunks
chunks = [nodes[i:i + 10000] for i in range(0, len(nodes), 10000)]

# Process chunks in parallel
with Pool(processes=4) as pool:
    indices = pool.map(create_index, chunks)

# Merge the processed indices
index = VectorStoreIndex.merge(indices)

# Persist the index
index.storage_context.persist(persist_dir="../storage/")

These strategies are aimed at optimizing performance by leveraging parallel processing, managing data more efficiently, and utilizing hardware capabilities. Testing different approaches to find the best fit for your specific needs is crucial, as optimizations can vary based on the dataset and system configuration [1].

Additionally, you can optimize the reading speed of PDF documents in SimpleDirectoryReader by using parallel processing. This can be achieved by setting the num_workers parameter to a positive integer value when calling the load_data() method. Example:

from multiprocessing import cpu_count

# Set the number of workers to the minimum of 2 or the number of available CPU cores
num_workers = min(2, cpu_count())

# Initialize the reader with desired configurations
reader = SimpleDirectoryReader(directory_path, recursive=True, exclude_hidden=True, num_files_limit=100)

# Load data using parallel processing
documents = reader.load_data(num_workers=num_workers)

This configuration allows the SimpleDirectoryReader to utilize multiple processes to load the data, significantly speeding up the reading process [2].

To continue talking to Dosu, mention @dosu.

logan-markewich commented 2 weeks ago

Really depends on the embedding model and vector store you are using.

But in any case, seeing an example of what from_documents is doing (and how you can do it) will probably help (imo i would use async instead of multiprocessing if using remote apis for your embedding model and vector store)

Synchronous

from llama_index.core.node_parser import SentenceSplitter
from llama_index.embeddings.openai import OpenAIEmbedding

splitter = SentenceSplitter(chunk_size=1024)

nodes = splitter(documents)

embed_model = OpenAIEmbedding(model_name="text-embedding-3-small", embed_batch_size=512)

node_texts = [node.get_content(metadata_mode="embed") for node in nodes]
embeddings = embed_model.get_text_embedding_batch(node_texts)
for (embedding, node) in zip(nodes, embeddings):
  node.embedding = embedding

vector_store.add(nodes)

Async:

from llama_index.core.node_parser import SentenceSplitter
from llama_index.embeddings.openai import OpenAIEmbedding

splitter = SentenceSplitter(chunk_size=1024)

nodes = splitter(documents)

embed_model = OpenAIEmbedding(model_name="text-embedding-3-small", embed_batch_size=512, num_workers=4)

node_texts = [node.get_content(metadata_mode="embed") for node in nodes]
embeddings = await embed_model.aget_text_embedding_batch(node_texts)
for (embedding, node) in zip(nodes, embeddings):
  node.embedding = embedding

await vector_store.async_add(nodes)
logan-markewich commented 2 weeks ago

If you are using the default vector store (i wouldn't tbh), you'd have to do index.insert_nodes(nodes) instead of inserting directly into the vector store like I did

compilebunny commented 2 weeks ago

Thanks @dosu

Can you explain how to get from the Document object returned by SimpleDirectoryReader to the nodes that are referenced in your example of parallel processing and incremental indexing in a parallel way?

dosubot[bot] commented 2 weeks ago

To convert the Document object returned by SimpleDirectoryReader to nodes in a parallel way for use in parallel processing and incremental indexing, you can use the IngestionPipeline class with the run method, specifying the num_workers parameter to enable parallel processing. Here is an example:

from multiprocessing import cpu_count
from llama_index.core.ingestion.pipeline import IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.schema import Document
from llama_index.core.storage.docstore import SimpleDocumentStore

# Example documents
document1 = Document.example()
document1.id_ = "1"
document2 = Document(text="One\n\n\nTwo\n\n\nThree.", doc_id="2")

# Create the pipeline with transformations
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
    ],
    docstore=SimpleDocumentStore(),
)

# Determine the number of workers for parallel processing
num_workers = min(2, cpu_count())

# Run the pipeline with parallel processing
nodes = pipeline.run(documents=[document1, document2], num_workers=num_workers)

# Output the nodes
print(nodes)

In this example, the IngestionPipeline is configured with a SentenceSplitter transformation and a SimpleDocumentStore. The run method is called with the documents parameter containing the documents to be processed and the num_workers parameter set to enable parallel processing. The num_workers is determined by the minimum of 2 and the number of CPU cores available [1].

Additionally, the NodeParser interface and its implementations like TextSplitter and MetadataAwareTextSplitter provide methods such as get_nodes_from_documents and aget_nodes_from_documents for parsing documents into nodes, which can be useful for more customized node parsing and processing [2].

To continue talking to Dosu, mention @dosu.

compilebunny commented 2 weeks ago

Thanks @dosu

Now, back to your original response.

Since I am using a local embed model, I changed the create_index function to:

def create_index(nodes):
    return VectorStoreIndex(nodes, include_embeddings=True, embed_model="local:/storage/embed_model_directory")

The following code creates two threads and then hangs after processing. It does not return a result back to the main program.

# Process chunks in parallel
with Pool(processes=4) as pool:
    indices = pool.map(create_index, chunks)

Can you explain how to get the resulting indices back to the main program?

dosubot[bot] commented 2 weeks ago

To get the resulting indices back to the main program when using multiprocessing.Pool, you need to ensure that the create_index function and the objects it uses are serializable by Python's pickle module. Additionally, you should avoid using threads within the create_index function, as multiprocessing.Pool itself handles parallelism using processes.

Here's an updated version of your code that should work:

  1. Ensure the create_index function is serializable:

    • Make sure that the embed_model parameter is correctly set up to be used within the function.
  2. Use multiprocessing.Pool correctly:

    • Ensure that the create_index function and its dependencies are defined in a way that they can be pickled.

Here's the updated code:

from multiprocessing import Pool, cpu_count
from llama_index.core import VectorStoreIndex
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.schema import Document

# Define the create_index function
def create_index(nodes):
    embed_model = HuggingFaceEmbedding(model_name="local:/storage/embed_model_directory")
    return VectorStoreIndex(nodes, include_embeddings=True, embed_model=embed_model)

# Example documents
documents = [Document.example()]  # Replace with your actual documents

# Split documents into nodes
splitter = SentenceSplitter(chunk_size=1024)
nodes = splitter(documents)

# Split nodes into manageable chunks
chunks = [nodes[i:i + 10000] for i in range(0, len(nodes), 10000)]

# Process chunks in parallel
with Pool(processes=min(4, cpu_count())) as pool:
    indices = pool.map(create_index, chunks)

# Merge the processed indices
index = VectorStoreIndex.merge(indices)

# Persist the index
index.storage_context.persist(persist_dir="../storage/")

Key Points:

  1. Serializable Function: Ensure that the create_index function and its dependencies (like HuggingFaceEmbedding) are serializable.
  2. Parallel Processing: Use multiprocessing.Pool to process chunks in parallel.
  3. Merging Indices: After processing, merge the resulting indices using VectorStoreIndex.merge.

This setup should allow you to process the chunks in parallel and get the resulting indices back to the main program without hanging. If you encounter any issues with serialization, ensure that all objects and functions used within create_index are compatible with Python's pickle module.

To continue talking to Dosu, mention @dosu.

Gzj369 commented 2 weeks ago

If you are using the default vector store (i wouldn't tbh), you'd have to do index.insert_nodes(nodes) instead of inserting directly into the vector store like I did

@logan-markewich I have a question, I use the class "FaissVectorStore" build to get "vector_store", then use vector_store.add([new_node]), finally query the new added knowledge,but cannot find ? The code is bellow: embed_model.embed_query cant convert str to List[float], it can work well.

new_add_knowledge = 'new add a sentence' (just a example)
vector_store = FaissVectorStore.from_persist_dir(vector_store_path)
metadata = {"source": "add"}
excluded = ["source"]
node = TextNode(text=new_add_knowledge, 
                metadata=metadata, 
                excluded_embed_metadata_keys=excluded, 
                excluded_llm_metadata_keys=excluded,
                embedding=embed_model.embed_query(new_add_knowledge))
logger.debug("node.embedding: {}", node.embedding)
vector_store.add([node])
print("--------------111")
# vector_store.persist(persist_path=vector_store_path)
res = vector_store.query(VectorStoreQuery(query_embedding=embed_model.embed_query(new_add_knowledge), similarity_top_k=2))
print("--------------222")
assert res.nodes
print("--------------333")
logger.debug("result: {}", res.nodes[0].get_content())

just "--------------222" has been printed, but "--------------333" and "result: " have not been printed , please have a look

Gzj369 commented 2 weeks ago

and another question, the parametter of vector_store.persist is like file_path, the parametter of vector_index.storage_context.persist is like dir, so for the code above, when I run the code

vector_store.persist(persist_path=vector_store_path)

it occur the error, so how can I solve it ?