run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
33.45k stars 4.69k forks source link

[Bug]: IngestionPipeline Parallel Processing for HuggingFaceEmbedding #13956

Open theta-lin opened 1 month ago

theta-lin commented 1 month ago

Bug Description

When using HuggingFaceEmbedding as a stage of IngestionPipeline and setting num_workers to greater than 1, the error AttributeError: 'HuggingFaceEmbedding' object has no attribute '_model' is raised. When not using parallel processing for IngestionPipeline, no error is given.

__See https://github.com/run-llama/llama_index/issues/13956#issuecomment-2177812142 for a plausible monkey patch quickfix__

Version

llama-index==0.10.38; llama-index-embeddings-huggingface==0.2.0

Steps to Reproduce

Put some documents in ./test, then run the following:

from llama_index.core import SimpleDirectoryReader
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter
from llama_index.embeddings.huggingface import HuggingFaceEmbedding

def main():
    documents = SimpleDirectoryReader("./test").load_data()
    pipeline = IngestionPipeline(
        transformations=[
            SentenceSplitter(),
            HuggingFaceEmbedding(
                model_name="BAAI/bge-small-en-v1.5", trust_remote_code=True
            ),
        ],
    )
    pipeline.run(documents=documents, num_workers=2)

if __name__ == "__main__":
    main()

Also see https://github.com/run-llama/llama_index/issues/13956#issuecomment-2177783432 for a more "concentrated" reproduction that may be more closer to the root cause of this issue.

Relevant Logs/Tracbacks

/code/.venv/lib/python3.11/site-packages/transformers/utils/hub.py:124: FutureWarning: Using `TRANSFORMERS_CACHE` is deprecated and will be removed in v5 of Transformers. Use `HF_HOME` instead.
  warnings.warn(
/code/.venv/lib/python3.11/site-packages/huggingface_hub/file_download.py:1132: FutureWarning: `resume_download` is deprecated and will be removed in version 1.0.0. Downloads always resume when possible. If you want to force a new download, use `force_download=True`.
  warnings.warn(
/code/.venv/lib/python3.11/site-packages/transformers/utils/hub.py:124: FutureWarning: Using `TRANSFORMERS_CACHE` is deprecated and will be removed in v5 of Transformers. Use `HF_HOME` instead.
  warnings.warn(
/code/.venv/lib/python3.11/site-packages/transformers/utils/hub.py:124: FutureWarning: Using `TRANSFORMERS_CACHE` is deprecated and will be removed in v5 of Transformers. Use `HF_HOME` instead.
  warnings.warn(
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/lib/python3.11/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
                    ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/multiprocessing/pool.py", line 51, in starmapstar
    return list(itertools.starmap(args[0], args[1]))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.11/site-packages/llama_index/core/ingestion/pipeline.py", line 124, in run_transformations
    nodes = transform(nodes, **kwargs)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.11/site-packages/llama_index/core/base/embeddings/base.py", line 450, in __call__
    embeddings = self.get_text_embedding_batch(
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.11/site-packages/llama_index/core/instrumentation/dispatcher.py", line 274, in wrapper
    result = func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.11/site-packages/llama_index/core/base/embeddings/base.py", line 341, in get_text_embedding_batch
    embeddings = self._get_text_embeddings(cur_batch)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.11/site-packages/llama_index/embeddings/huggingface/base.py", line 149, in _get_text_embeddings
    return self._embed(texts, prompt_name="text")
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.11/site-packages/llama_index/embeddings/huggingface/base.py", line 124, in _embed
    return self._model.encode(
           ^^^^^^^^^^^
AttributeError: 'HuggingFaceEmbedding' object has no attribute '_model'
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/code/ingestion_bug_minimal.py", line 21, in <module>
    main()
  File "/code/ingestion_bug_minimal.py", line 17, in main
    pipeline.run(documents=documents, num_workers=2)
  File "/code/.venv/lib/python3.11/site-packages/llama_index/core/ingestion/pipeline.py", line 722, in run
    nodes_parallel = p.starmap(
                     ^^^^^^^^^^
  File "/usr/lib/python3.11/multiprocessing/pool.py", line 375, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/multiprocessing/pool.py", line 774, in get
    raise self._value
AttributeError: 'HuggingFaceEmbedding' object has no attribute '_model'
dosubot[bot] commented 1 month ago

Hey there, @theta-lin! 👋 I'm here to help you out with any bugs, questions, or contributions you might have while waiting for a human maintainer.

To resolve the AttributeError: 'HuggingFaceEmbedding' object has no attribute '_model' when using the HuggingFaceEmbedding stage with num_workers set to a value greater than 1 in the IngestionPipeline, you need to ensure that the HuggingFaceEmbedding object is properly initialized in each worker process. This can be done by modifying the run_transformations function to reinitialize the HuggingFaceEmbedding object within each worker.

Here's a modified version of the run_transformations function to include reinitialization of the HuggingFaceEmbedding object:

def run_transformations(
    nodes: List[BaseNode],
    transformations: Sequence[TransformComponent],
    in_place: bool = True,
    cache: Optional[IngestionCache] = None,
    cache_collection: Optional[str] = None,
    **kwargs: Any,
) -> List[BaseNode]:
    """Run a series of transformations on a set of nodes.

    Args:
        nodes: The nodes to transform.
        transformations: The transformations to apply to the nodes.

    Returns:
        The transformed nodes.
    """
    if not in_place:
        nodes = list(nodes)

    for transform in transformations:
        # Reinitialize HuggingFaceEmbedding if necessary
        if isinstance(transform, HuggingFaceEmbedding):
            transform = HuggingFaceEmbedding(
                model_name=transform.model_name,
                max_length=transform.max_length,
                query_instruction=transform.query_instruction,
                text_instruction=transform.text_instruction,
                normalize=transform.normalize,
                cache_folder=transform.cache_folder,
                device=transform._device,
                callback_manager=transform.callback_manager,
                **transform.model_kwargs
            )  # Reinitialize the object with the same parameters

        if cache is not None:
            hash = get_transformation_hash(nodes, transform)
            cached_nodes = cache.get(hash, collection=cache_collection)
            if cached_nodes is not None:
                nodes = cached_nodes
            else:
                nodes = transform(nodes, **kwargs)
                cache.put(hash, nodes, collection=cache_collection)
        else:
            nodes = transform(nodes, **kwargs)

    return nodes

By reinitializing the HuggingFaceEmbedding object within each worker with the same parameters, you ensure that each process has its own instance of the object, which should resolve the AttributeError [1][2][3].

To continue talking to Dosu, mention @dosu.

theta-lin commented 1 month ago

@dosu While I think the problem might be related to the initialization of HuggingFaceEmbedding in each process, after patching run_transformations() with your code, the same issue persists. The error message is now AttributeError: 'HuggingFaceEmbedding' object has no attribute 'model_kwargs'.

dosubot[bot] commented 1 month ago

To resolve the AttributeError: 'HuggingFaceEmbedding' object has no attribute '_model' error when using HuggingFaceEmbedding with num_workers set to greater than 1 in an IngestionPipeline, you need to ensure that the _model attribute is properly initialized in each worker process. This error typically occurs when the object is being used in a multiprocessing context where the initialization of the _model attribute might not be correctly shared across processes.

One way to address this issue is to ensure that the _model attribute is correctly initialized in each worker process. You can achieve this by overriding the __getstate__ and __setstate__ methods to handle the serialization and deserialization of the object properly.

Here is an example of how you can modify the HuggingFaceEmbedding class to include these methods:

class HuggingFaceEmbedding(BaseEmbedding):
    # ... [other parts of the class]

    def __getstate__(self):
        state = self.__dict__.copy()
        # Remove the _model attribute from the state
        state['_model'] = None
        return state

    def __setstate__(self, state):
        self.__dict__.update(state)
        # Reinitialize the _model attribute
        from transformers import AutoModel
        self._model = AutoModel.from_pretrained(self.model_name).to(self._device)

    def __init__(
        self,
        model_name: Optional[str] = None,
        tokenizer_name: Optional[str] = None,
        pooling: Optional[str] = None,
        max_length: Optional[int] = None,
        query_instruction: Optional[str] = None,
        text_instruction: Optional[str] = None,
        normalize: bool = True,
        model: Optional[Any] = None,
        tokenizer: Optional[Any] = None,
        embed_batch_size: int = DEFAULT_EMBED_BATCH_SIZE,
        cache_folder: Optional[str] = None,
        trust_remote_code: bool = False,
        device: Optional[str] = None,
        callback_manager: Optional[CallbackManager] = None,
    ):
        # ... [existing initialization code]

        self._device = device or infer_torch_device()
        cache_folder = cache_folder or get_cache_dir()

        if model is None:
            model_name = model_name if model_name is not None else DEFAULT_HUGGINGFACE_EMBEDDING_MODEL
            model = AutoModel.from_pretrained(model_name, cache_dir=cache_folder, trust_remote_code=trust_remote_code)
        elif model_name is None:
            model_name = model.name_or_path
        self._model = model.to(self._device)

        if tokenizer is None:
            tokenizer_name = model_name or tokenizer_name or DEFAULT_HUGGINGFACE_EMBEDDING_MODEL
            tokenizer = AutoTokenizer.from_pretrained(tokenizer_name, cache_dir=cache_folder)
        elif tokenizer_name is None:
            tokenizer_name = tokenizer.name_or_path
        self._tokenizer = tokenizer

        if max_length is None:
            try:
                max_length = int(self._model.config.max_position_embeddings)
            except AttributeError as exc:
                raise ValueError("Unable to find max_length from model config. Please specify max_length.") from exc

        if not pooling:
            pooling = get_pooling_mode(model_name)
        try:
            pooling = Pooling(pooling)
        except ValueError as exc:
            raise NotImplementedError(f"Pooling {pooling} unsupported, please pick one in {[p.value for p in Pooling]}.") from exc

        super().__init__(
            embed_batch_size=embed_batch_size,
            callback_manager=callback_manager,
            model_name=model_name,
            tokenizer_name=tokenizer_name,
            max_length=max_length,
            pooling=pooling,
            normalize=normalize,
            query_instruction=query_instruction,
            text_instruction=text_instruction,
        )

By adding the __getstate__ and __setstate__ methods, you ensure that the _model attribute is properly reinitialized in each worker process, which should resolve the AttributeError [1].

To continue talking to Dosu, mention @dosu.

xKwan commented 1 month ago

@dosu It indeed resolved the AttributeError: 'HuggingFaceEmbedding' object has no attribute '_model', but instead now I have this error AttributeError: 'HuggingFaceEmbedding' object has no attribute '_device'

theta-lin commented 1 month ago

@xKwan Actually, I would like to amend my previous comment:

@dosu While I think the problem might be related to the initialization of HuggingFaceEmbedding in each process, after patching run_transformations() with your code, the same issue persists.

The first solution also solved AttributeError: 'HuggingFaceEmbedding' object has no attribute '_model' for me, but I also got another error, which differs from yours though. It is AttributeError: 'HuggingFaceEmbedding' object has no attribute 'model_kwargs'. Sorry for not clearly reading the output last night.

dosubot[bot] commented 1 month ago

To resolve the AttributeError: 'HuggingFaceEmbedding' object has no attribute 'model_kwargs', you need to ensure that the HuggingFaceEmbedding object is correctly initialized in each worker without relying on non-existent attributes. Here is the corrected code snippet:

from llama_index.core import SimpleDirectoryReader
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter
from llama_index.embeddings.huggingface import HuggingFaceEmbedding

def create_huggingface_embedding():
    return HuggingFaceEmbedding(
        model_name="BAAI/bge-small-en-v1.5", trust_remote_code=True
    )

def main():
    documents = SimpleDirectoryReader("./test").load_data()
    pipeline = IngestionPipeline(
        transformations=[
            SentenceSplitter(),
            create_huggingface_embedding(),
        ],
    )
    pipeline.run(documents=documents, num_workers=2)

if __name__ == "__main__":
    main()

In this code, the create_huggingface_embedding function is used to reinitialize the HuggingFaceEmbedding object within each worker, ensuring that the object is correctly instantiated without any missing attributes. This approach ensures that each worker gets a fresh instance of HuggingFaceEmbedding with all necessary attributes properly initialized [1][2][3].

To continue talking to Dosu, mention @dosu.

xKwan commented 1 month ago

@xKwan Actually, I would like to amend my previous comment:

@dosu While I think the problem might be related to the initialization of HuggingFaceEmbedding in each process, after patching run_transformations() with your code, the same issue persists.

The first solution also solved AttributeError: 'HuggingFaceEmbedding' object has no attribute '_model' for me, but I also got another error, which differs from yours though. It is AttributeError: 'HuggingFaceEmbedding' object has no attribute 'model_kwargs'. Sorry for not clearly reading the output last night.

Not sure why we got different errors. I tried the latest update by dosubot and it is still not working. Not sure if the developers actually tested this function before releasing this multiprocessing feature.

ryanhoangt commented 3 weeks ago

I'm also facing the same issue, any update on this?

Vitor-Almeida commented 3 weeks ago

I'm having the same issue

theta-lin commented 3 weeks ago

Hi, guys, I am trying to track down this issue myself. Now I have narrowed it down to this:

from llama_index.embeddings.huggingface import HuggingFaceEmbedding
import multiprocessing
from itertools import repeat

def test(node, embed):
    print(f"node: {node}")
    print("in multiprocessing")
    print(f"attributes: {dir(embed)}")
    print(f"model: {embed._model}")

def main():
    embed = HuggingFaceEmbedding()
    print("no multiprocessing")
    print(f"attributes: {dir(embed)}")
    print(f"model: {embed._model}")

    with multiprocessing.get_context("spawn").Pool(1) as p:
        nodes = [0]
        p.starmap(test, zip(nodes, repeat(embed)))

if __name__ == "__main__":
    main()

As you can see, this is likely an issue with sharing the _model attribute between multiple processes. https://github.com/run-llama/llama_index/blob/3ff05760b49b8d8caa5ca632d02415b423426c32/llama-index-integrations/embeddings/llama-index-embeddings-huggingface/llama_index/embeddings/huggingface/base.py#L49 https://github.com/run-llama/llama_index/blob/3ff05760b49b8d8caa5ca632d02415b423426c32/llama-index-integrations/embeddings/llama-index-embeddings-huggingface/llama_index/embeddings/huggingface/base.py#L87-L98

I also tried adding a testing variable _test = PrivateAttr() to HuggingFaceEmbedding and _test = True to its __init__() function. However, this variable is accessible from another process, so I guess that the fact that _model is a SentenceTransformer may have played a role in this.

I don't think I would keep working on this issue anytime soon though. Maybe it would be better if someone more knowledgeable could come and take a look at this. However, I would try to improve dosu's approach and find a quickfix.

theta-lin commented 3 weeks ago

This monkey patch should provide a temporary fix, please confirm whether this works for you:

from llama_index.core.ingestion.pipeline import get_transformation_hash
from llama_index.core.ingestion.cache import IngestionCache
from llama_index.core.schema import BaseNode, TransformComponent
from typing import Any, List, Optional, Sequence
import llama_index.core.ingestion.pipeline

def run_transformations_patched(
    nodes: List[BaseNode],
    transformations: Sequence[TransformComponent],
    in_place: bool = True,
    cache: Optional[IngestionCache] = None,
    cache_collection: Optional[str] = None,
    **kwargs: Any,
) -> List[BaseNode]:
    """Run a series of transformations on a set of nodes.

    Args:
        nodes: The nodes to transform.
        transformations: The transformations to apply to the nodes.

    Returns:
        The transformed nodes.
    """
    if not in_place:
        nodes = list(nodes)

    for transform in transformations:
        # Reinitialize HuggingFaceEmbedding if necessary
        if isinstance(transform, HuggingFaceEmbedding):
            transform = HuggingFaceEmbedding(
                model_name=transform.model_name,
                # NOTE: only needed in my case, you only need to pass in the parameters needed for your project
                trust_remote_code=True,
            )

        if cache is not None:
            hash = get_transformation_hash(nodes, transform)
            cached_nodes = cache.get(hash, collection=cache_collection)
            if cached_nodes is not None:
                nodes = cached_nodes
            else:
                nodes = transform(nodes, **kwargs)
                cache.put(hash, nodes, collection=cache_collection)
        else:
            nodes = transform(nodes, **kwargs)

    return nodes

llama_index.core.ingestion.pipeline.run_transformations = run_transformations_patched

This is a simple modification of https://github.com/run-llama/llama_index/issues/13956#issuecomment-2150471775 by removing the unnecessary parameters for the reinitialization of HuggingfaceEmbedding Nevertheless, if would be much better if someone could find the root cause of this

Edit 1: After more testing, I got this warning:

/usr/lib/python3.10/multiprocessing/resource_tracker.py:224: UserWarning: resource_tracker: There appear to be 1 leaked semaphore objects to clean up at shutdown

I think it also hints at something regarding the interaction of SentenceTransformer and multiprocessing?

Edit 2: After fixing this issue this way, a similar issue arose from my use of metadata extractors and LlamaCPP as the following error occured:

AttributeError: 'HuggingFaceEmbedding' object has no attribute '_model'

However, I do think is it a bit infeasible to manually initialize multiple instances of LLM in a way similar to my solution above.