run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
36.8k stars 5.28k forks source link

[Bug]: Parallel Processing in Ingestion Pipeline #10104

Closed americanthinker closed 10 months ago

americanthinker commented 10 months ago

Bug Description

Setting num_workers to anything other than None causes the IngestionPipeline.run method to simply hang.

Version

0.9.31

Steps to Reproduce

docs=[...] splitter = SentenceSplitter(chunk_overlap=0, chunk_size=128) model_name = 'sentence-transformers/all-miniLM-L6-v2' embed_model = HuggingFaceEmbedding(model_name=model_name, pooling='mean', embed_batch_size=64) pipeline = IngestionPipeline(transformations=[splitter, embed_model]) nodes = pipeline.run(documents=docs, num_workers=os.cpu_count(), show_progress=True)

Relevant Logs/Tracbacks

There isn't a traceback, the code simply hangs, but when I hit keyboard interrupt we get this message:

File /anaconda/envs/openai/lib/python3.10/site-packages/llama_index/ingestion/pipeline.py:454, in IngestionPipeline.run(self, show_progress, documents, nodes, cache_collection, in_place, store_doc_text, num_workers, **kwargs)
    450     with multiprocessing.Pool(num_workers) as p:
    451         node_batches = self._node_batcher(
    452             num_batches=num_workers, nodes=nodes_to_run
    453         )
--> 454         nodes_parallel = p.starmap(
    455             run_transformations,
    456             zip(
    457                 node_batches,
    458                 repeat(self.transformations),
    459                 repeat(in_place),
    460                 repeat(self.cache if not self.disable_cache else None),
    461                 repeat(cache_collection),
    462             ),
    463         )
    464         nodes = reduce(lambda x, y: x + y, nodes_parallel)
    465 else:

File /anaconda/envs/openai/lib/python3.10/multiprocessing/pool.py:375, in Pool.starmap(self, func, iterable, chunksize)
    369 def starmap(self, func, iterable, chunksize=None):
    370     '''
    371     Like `map()` method but the elements of the `iterable` are expected to
    372     be iterables as well and will be unpacked as arguments. Hence
    373     `func` and (a, b) becomes func(a, b).
    374     '''
--> 375     return self._map_async(func, iterable, starmapstar, chunksize).get()

File /anaconda/envs/openai/lib/python3.10/multiprocessing/pool.py:768, in ApplyResult.get(self, timeout)
    767 def get(self, timeout=None):
--> 768     self.wait(timeout)
    769     if not self.ready():
    770         raise TimeoutError

File /anaconda/envs/openai/lib/python3.10/multiprocessing/pool.py:765, in ApplyResult.wait(self, timeout)
    764 def wait(self, timeout=None):
--> 765     self._event.wait(timeout)

File /anaconda/envs/openai/lib/python3.10/threading.py:607, in Event.wait(self, timeout)
    605 signaled = self._flag
    606 if not signaled:
--> 607     signaled = self._cond.wait(timeout)
    608 return signaled

File /anaconda/envs/openai/lib/python3.10/threading.py:320, in Condition.wait(self, timeout)
    318 try:    # restore state no matter what (e.g., KeyboardInterrupt)
    319     if timeout is None:
--> 320         waiter.acquire()
    321         gotit = True
    322     else:
dosubot[bot] commented 10 months ago

🤖

It's great that you've identified this issue with the num_workers parameter causing the IngestionPipeline.run method to hang. Your detailed description and the relevant logs/tracbacks will be very helpful for the maintainers to understand the problem.

If you're able to propose a fix or even volunteer to create a pull request to address this bug, it would be a valuable contribution to the LlamaIndex project. Your input and contributions are highly appreciated!


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

logan-markewich commented 10 months ago

@americanthinker What OS is this on?

nerdai commented 10 months ago

Interesting. Will take a look shortly!

americanthinker commented 10 months ago

@logan-markewich & @nerdai running on an Azure DSVM image Ubuntu 20.04. I write multiprocessing code all day long on this machine.

nerdai commented 10 months ago

@americanthinker did you try reducing the number of workers?

I ask because, at least in my testing, ingestion pipelines using HuggingFace embedding models were quite computationally intensive. So, I don't set a high value of workers when using these models.

I just tried your snippet of code with 4 workers on my 12 core MacBook Pro and it worked.

nerdai commented 10 months ago

With that being said, progress was not being shown despite having show_progress=True. I'll look into what's going on here too.

americanthinker commented 10 months ago

@nerdai just tried setting the num_workers param to the lowest possible value (2) and got the same result. Code just hangs and I get the same message when executing the keyboard interrupt. I haven't dug into the code base, so I'm wondering how is the code parallelizing the embedding step? In my experience you can't serialize a pytorch model so multiprocessing is a no-go for that step, and you'd want to take advantage of the batch processing built-in to the SentenceTransformer models anyway.

nerdai commented 10 months ago

Hey @americanthinker,

I spun up an Azure DVSM Ubuntu 20.04 (8vCPU, 32GB) VM and was able to replicate the hanging bug that you were experiencing. After a bit of investigating on the issue, it turns out that we're encountering a deadlock scenario here with the forks. You can read more about it here (skip to the section "The real solution: stop plain fork()ing" for the fix).

You should just need to add the following two lines of code in your script or jupyter notebook before invoking the run method:

from multiprocessing import set_start_method
set_start_method("spawn", force=True)

There's also a change we can make in the library to set this start method as the default. I'll submit a PR for that fix soon.

(Also, to answer your question about how we have multiprocessing setup for ingestion_pipeline.run: you can actually see it in the traceback you shared, but here's the link to the code for convenience. Ultimately, we've got a global function run_transformations that takes in the list of transformations amongst other things. Its my understanding that the arguments and function are pickled/serialized as strings before being shipped off to the processors in order to reconstruct the transformation objects.)

nerdai commented 10 months ago

Also, here is the script that I used:

from llama_index import SimpleDirectoryReader
from llama_index.embeddings import HuggingFaceEmbedding
from llama_index.text_splitter import SentenceSplitter
from llama_index.ingestion import IngestionPipeline
from multiprocessing import set_start_method

# import logging
# import sys

# logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
# logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))

if __name__ == "__main__":
    print("loading documents\n")
    documents = SimpleDirectoryReader(
        input_dir="./data/source_files"
    ).load_data(num_workers=2)

    print("creating ingestion pipeline\n")
    splitter = SentenceSplitter(chunk_overlap=0, chunk_size=128)
    model_name = 'sentence-transformers/all-miniLM-L6-v2'
    embed_model = HuggingFaceEmbedding(model_name=model_name, pooling='mean', embed_batch_size=64)
    pipeline = IngestionPipeline(transformations=[splitter, embed_model])

    print("running pipeline")
    set_start_method("spawn", force=True)  # it hangs without this line
    nodes = pipeline.run(documents=documents[:4], num_workers=4, show_progress=True)

    print("all done!")
    print(len(nodes))

As for the data, I downloaded PaulGrahamEssayDataset from our llamahub using llama-index-cli tool prior to running the script.

llamaindex-cli download-llamadataset PaulGrahamEssayDataset --download-dir ./data
americanthinker commented 10 months ago

@nerdai thanks for looking into that issue. The problem goes away after setting the start_method to spawn, but there is no appreciable speed up compared to sequential processing, in fact the performance actually degrades. It's ok though, I don't need to use the llamaindex pipeline, I can get the preprocessing completed much quicker if I push the initial steps through a multiprocessing loop and then separately create the embeddings using the built-in batching already provided in the SentenceTransformer encode method.

nerdai commented 10 months ago

@nerdai thanks for looking into that issue. The problem goes away after setting the start_method to spawn, but there is no appreciable speed up compared to sequential processing, in fact the performance actually degrades. It's ok though, I don't need to use the llamaindex pipeline, I can get the preprocessing completed much quicker if I push the initial steps through a multiprocessing loop and then separately create the embeddings using the built-in batching already provided in the SentenceTransformer encode method.

Thanks for the confirmation that it the hanging problem at least went away. Too bad it didn't lead to speed up compared to sequential processing — that can happen of course if the time to serialize and acquire locks is greater than the work that's being distributed. Perhaps also there is room for improvement to be had in our parallel processing setup :)

By all means, take the strategy that works for you and your workload. Cheers!

AlbertoMQ commented 2 months ago

Setting the num_workers to anything above 1 freezes everything up for me.

logan-markewich commented 2 months ago

@AlbertoMQ num workers doesn't really work for local models (which i saw you were using).

Tbh we should probably just remove this option