run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
33.97k stars 4.79k forks source link

[Question]: How to Achieve Parallel processing in SentenceSplitter #13473

Open umesh9794 opened 2 months ago

umesh9794 commented 2 months ago

Question Validation

Question

I have below simple implementation of SentenceSplitter:

# chunking logic here
splitter = SentenceSplitter(chunk_size=conf.chunk_size, chunk_overlap=conf.chunk_overlap)
for node in splitter.split_text(scrap["scrap_text"]):
   # adiitonal processing like trash removal etc goes here

The above code runs on Apache Spark. Still I am not seeing any parallelism happening in the SentenceSplitter level. Is this the expected behaviour or am I doing something wrong?

If want to parallelize SentenceSplitter, is IngestionPipeline only option or can I do that without using IngestionPipeline?

dosubot[bot] commented 2 months ago

Hello @umesh9794! 👋 I'm Dosu, your friendly neighborhood bot here to assist with bugs, answer questions, and guide you on contributing while you're waiting for a human maintainer. Just know, I'm here to support you! Now, let me take a closer look at your issue regarding parallel processing in SentenceSplitter and get back to you shortly with a detailed response.

logan-markewich commented 2 months ago

I would use multiprocessing if you don't want to use ingestion pipeline 🤷🏻‍♂️ Python has a few ways to do multiprocessing

def split(x):
  splitter = SentenceSplitter(...)
  return splitter(x)

from multiprocessing import Pool

with Pool(5) as p:
    results = p.map(split, [doc2, doc2, doc3, ...])
umesh9794 commented 2 months ago

@logan-markewich thanks for quick hint, however I think my issue is slightly different. The code block you shared does the parallelism for the number of docs. That I am getting from the Apache Spark workers.

But my question is how can I split each document and do some transformation in parallel ? Sorry if I was not clear before.

logan-markewich commented 2 months ago

So you want to split the document into pieces and then run nodes in parallel/batches? The solution will be similar as above, or use the ingestion pipeline. I don't know how that translate to apache spark workers, never used that before

dosubot[bot] commented 2 months ago

To achieve parallel processing for splitting each document into chunks and performing transformations on those chunks within Apache Spark, you can follow a structured approach leveraging Spark's distributed data processing capabilities:

  1. Distribute Your Data: Create a Resilient Distributed Dataset (RDD) or a DataFrame with your documents to distribute them across the Spark cluster.
# Assuming `sc` is your SparkContext and `data` is your list of documents
rdd = sc.parallelize(data)
  1. Define Transformation Function: Implement a function that applies the SentenceSplitter to split the document and then performs any additional processing required on the chunks.
def process_document(document):
    splitter = SentenceSplitter(chunk_size=conf.chunk_size, chunk_overlap=conf.chunk_overlap)
    chunks = splitter.split_text(document)
    # Perform additional processing on chunks here
    return processed_chunks
  1. Apply Transformations in Parallel: Use the map function to apply your process_document function to each document in the RDD, enabling parallel processing of the document splitting and transformations.
processed_rdd = rdd.map(process_document)
  1. Collect or Store the Results: After processing, you can collect the results back to the driver program or store them in an external system.
results = processed_rdd.collect()

This method utilizes Spark's parallel processing capabilities to distribute the workload across the cluster, ensuring that each document is split and processed in parallel. The actual parallelism level will depend on your Spark cluster's configuration, such as the number of worker nodes and cores per node. Remember, Spark's transformations are lazy and only execute when an action (like collect) is called, allowing Spark to optimize the execution plan.

#### Details **Sources** - [llama-index-core/tests/ingestion/test_pipeline.py](https://github.com/run-llama/llama_index/blob/main/llama-index-core/tests/ingestion/test_pipeline.py) - [docs/docs/module_guides/loading/ingestion_pipeline/transformations.md](https://github.com/run-llama/llama_index/blob/main/docs/docs/module_guides/loading/ingestion_pipeline/transformations.md) - [llama-index-core/llama_index/core/ingestion/pipeline.py](https://github.com/run-llama/llama_index/blob/main/llama-index-core/llama_index/core/ingestion/pipeline.py) - [llama-index-core/llama_index/core/ingestion/transformations.py](https://github.com/run-llama/llama_index/blob/main/llama-index-core/llama_index/core/ingestion/transformations.py) - [llama-index-core/llama_index/core/node_parser/text/sentence.py](https://github.com/run-llama/llama_index/blob/main/llama-index-core/llama_index/core/node_parser/text/sentence.py) - [llama-index-legacy/llama_index/legacy/node_parser/text/sentence.py](https://github.com/run-llama/llama_index/blob/main/llama-index-legacy/llama_index/legacy/node_parser/text/sentence.py) - [docs/docs/module_guides/loading/ingestion_pipeline/index.md](https://github.com/run-llama/llama_index/blob/main/docs/docs/module_guides/loading/ingestion_pipeline/index.md) - [docs/docs/examples/ingestion/parallel_execution_ingestion_pipeline.ipynb](https://github.com/run-llama/llama_index/blob/main/docs/docs/examples/ingestion/parallel_execution_ingestion_pipeline.ipynb) - [llama-index-legacy/llama_index/legacy/ingestion/pipeline.py](https://github.com/run-llama/llama_index/blob/main/llama-index-legacy/llama_index/legacy/ingestion/pipeline.py) - [docs/docs/optimizing/production_rag.md](https://github.com/run-llama/llama_index/blob/main/docs/docs/optimizing/production_rag.md) - [llama-index-core/tests/text_splitter/test_sentence_splitter.py](https://github.com/run-llama/llama_index/blob/main/llama-index-core/tests/text_splitter/test_sentence_splitter.py) - [llama-index-legacy/tests/text_splitter/test_sentence_splitter.py](https://github.com/run-llama/llama_index/blob/main/llama-index-legacy/tests/text_splitter/test_sentence_splitter.py)
**About Dosu** This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

umesh9794 commented 2 months ago

So you want to split the document into pieces and then run nodes in parallel/batches? The solution will be similar as above, or use the ingestion pipeline. I don't know how that translate to apache spark workers, never used that before

@logan-markewich Yes you got it right.

umesh9794 commented 2 months ago

To achieve parallel processing for splitting each document into chunks and performing transformations on those chunks within Apache Spark, you can follow a structured approach leveraging Spark's distributed data processing capabilities:

  1. Distribute Your Data: Create a Resilient Distributed Dataset (RDD) or a DataFrame with your documents to distribute them across the Spark cluster.
# Assuming `sc` is your SparkContext and `data` is your list of documents
rdd = sc.parallelize(data)
  1. Define Transformation Function: Implement a function that applies the SentenceSplitter to split the document and then performs any additional processing required on the chunks.
def process_document(document):
    splitter = SentenceSplitter(chunk_size=conf.chunk_size, chunk_overlap=conf.chunk_overlap)
    chunks = splitter.split_text(document)
    # Perform additional processing on chunks here
    return processed_chunks
  1. Apply Transformations in Parallel: Use the map function to apply your process_document function to each document in the RDD, enabling parallel processing of the document splitting and transformations.
processed_rdd = rdd.map(process_document)
  1. Collect or Store the Results: After processing, you can collect the results back to the driver program or store them in an external system.
results = processed_rdd.collect()

This method utilizes Spark's parallel processing capabilities to distribute the workload across the cluster, ensuring that each document is split and processed in parallel. The actual parallelism level will depend on your Spark cluster's configuration, such as the number of worker nodes and cores per node. Remember, Spark's transformations are lazy and only execute when an action (like collect) is called, allowing Spark to optimize the execution plan.

Details

To continue the conversation, mention @dosu.

I have something like this already :

df = spark.read\
            .format("kafka")\
            .option("kafka.bootstrap.servers", conf.kafka_brokers)\
            .option("kafka.security.protocol", "PLAINTEXT")\
            .option("failOnDataLoss","false")\
            .option("subscribe", conf.kafka_topic_name)\
            .option("startingOffsets", json.dumps(offsets))\
            .option("endingOffsets", json.dumps(end_offsets))\
            .load()
    rdd = df.foreach(apply_cleanup_and_store) 

Now the apply_cleanup_and_store looks like this :

def apply_cleanup_and_store(row):
   # Some preprocessing 
   ..
   # chunking logic here
   splitter = SentenceSplitter(chunk_size=conf.chunk_size, chunk_overlap=conf.chunk_overlap)
   for node in splitter.split_text(scrap["scrap_text"]):
      # adiitonal processing like trash removal etc goes here
      ..
      # Store into vector DB

So apply_cleanup_and_store runs on Spark executors. What I want to paralleize is this for loop : for node in splitter.split_text(scrap["scrap_text"]):

dosubot[bot] commented 2 months ago

To parallelize the processing of each chunk within the apply_cleanup_and_store function across your Spark executors, you'll need to adjust your approach to leverage Spark's distributed computing capabilities more effectively. Here's a strategy that aligns with Spark's design for parallel data processing:

  1. Transform Documents into Chunks: Instead of processing the chunks sequentially within a single executor, you should aim to distribute the chunks of each document across the Spark cluster. This can be achieved by transforming your input DataFrame into a distributed dataset (RDD or another DataFrame) where each record represents a chunk of text to be processed.

  2. Parallel Processing of Chunks: Utilize Spark's map or flatMap transformations to apply your processing logic (such as cleanup and storage) to each chunk in parallel. This involves encapsulating the logic you have within your loop into a function that can be applied to each chunk distributed across the cluster.

  3. Action to Trigger Execution: Remember that Spark's operations are lazily evaluated. This means that your transformations will only be executed when an action (e.g., collect, count, saveAsTextFile) is called. Make sure to include such an action to trigger the distributed processing of your chunks.

Here's a simplified example to illustrate this approach:

from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName("DocumentChunkProcessing").getOrCreate()

# Assuming `df` is your DataFrame containing documents
# Convert documents to chunks and distribute them
chunks_df = df.rdd.flatMap(lambda row: split_into_chunks(row)).toDF()

# Define a function for processing each chunk
def process_chunk(chunk):
    # Your cleanup and processing logic here
    cleaned_chunk = clean_chunk(chunk)
    # Logic to store the chunk
    store_chunk(cleaned_chunk)
    return cleaned_chunk

# Apply the processing function to each chunk in parallel
processed_chunks = chunks_df.rdd.map(process_chunk)

# Trigger the processing with an action
processed_chunks.count()  # This is just an example action

In this example, split_into_chunks, clean_chunk, and store_chunk are placeholders for your actual functions. The key concept is to first distribute the work (chunks) across the cluster and then apply your processing logic in parallel to each piece of work.

This method ensures that you're fully utilizing Spark's parallel processing capabilities, allowing for more efficient processing of each document's chunks. Adjust the example as necessary to fit your specific use case, especially considering how you handle dependencies and ensure that your storage operations are designed to support concurrent writes if needed.

#### Details **About Dosu** This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.