run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
33.31k stars 4.66k forks source link

[Bug]: Ingestion pipeline failed with num_workers>1 and SentenceWindowNodeParser #14280

Closed melya closed 1 day ago

melya commented 2 weeks ago

Bug Description

When attempting to run the ingestion pipeline with more than one worker (num_workers > 1), the process fails when using the SentenceWindowNodeParser.

Version

0.10.47

Steps to Reproduce

Here is code which reproduces the bug

import os

from llama_index.core import SimpleDirectoryReader
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SentenceWindowNodeParser

num_cores = os.cpu_count()
num_workers = min(4, num_cores)

director_reader = SimpleDirectoryReader(
    input_dir="./data"
)

documents = director_reader.load_data()

pipeline = IngestionPipeline(
    transformations=[
        SentenceWindowNodeParser.from_defaults(
            window_size=6,
            window_metadata_key="window",
            original_text_metadata_key="original_text",
        ),
    ],
)

pipeline.run(documents=documents, show_progress=True, num_workers=num_workers)

Relevant Logs/Tracbacks

Traceback (most recent call last):
  File "/Users/username/PycharmProjects/llamaindex-ingestion-pipeline-bug/main.py", line 26, in <module>
    nodes = pipeline.run(documents=documents, show_progress=True, num_workers=num_workers)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/PycharmProjects/llamaindex-ingestion-pipeline-bug/.venv/lib/python3.11/site-packages/llama_index/core/instrumentation/dispatcher.py", line 230, in wrapper
    result = func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/PycharmProjects/llamaindex-ingestion-pipeline-bug/.venv/lib/python3.11/site-packages/llama_index/core/ingestion/pipeline.py", line 726, in run
    nodes_parallel = p.starmap(
                     ^^^^^^^^^^
  File "/Users/username/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/pool.py", line 375, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/pool.py", line 774, in get
    raise self._value
  File "/Users/username/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/pool.py", line 540, in _handle_tasks
    put(task)
  File "/Users/username/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'split_by_sentence_tokenizer.<locals>.split'
dosubot[bot] commented 2 weeks ago

Hey there, @melya! I'm here to help you troubleshoot the issue you're experiencing with the ingestion pipeline. I can assist with bug fixes, answer your questions, and guide you on becoming a contributor while you wait for a human maintainer. Let's dive into resolving this issue!

To fix the AttributeError: Can't pickle local object 'split_by_sentence_tokenizer.<locals>.split' when running the ingestion pipeline with num_workers > 1 using the SentenceWindowNodeParser, you need to ensure that the function or object being passed to the multiprocessing pool is pickleable. This error typically occurs because local functions or lambdas cannot be pickled.

Here's how you can refactor the code to resolve this issue:

  1. Move the local function split outside of the split_by_sentence_tokenizer function.
  2. Ensure that any objects or functions passed to the ProcessPoolExecutor are defined at the top level of the module.

Here is an example of how you can refactor the code:

import os
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial
from typing import List, Sequence, Any, Optional

from llama_index.core import SimpleDirectoryReader, IngestionPipeline, Document, BaseNode
from llama_index.core.node_parser import SentenceWindowNodeParser
from llama_index.core.ingestion.cache import IngestionCache
from llama_index.core.ingestion.transform import TransformComponent

# Define the split function at the top level
def split(text):
    # Your split logic here
    pass

def split_by_sentence_tokenizer(text):
    # Use the top-level split function
    return split(text)

# Ensure arun_transformations_wrapper is also at the top level
def arun_transformations_wrapper(
    nodes: List[BaseNode],
    transformations: Sequence[TransformComponent],
    in_place: bool = True,
    cache: Optional[IngestionCache] = None,
    cache_collection: Optional[str] = None,
    **kwargs: Any,
) -> List[BaseNode]:
    """Wrapper for async run_transformation. To be used in loop.run_in_executor
    within a ProcessPoolExecutor.
    """
    loop = asyncio.new_event_loop()
    nodes = loop.run_until_complete(
        arun_transformations(
            nodes=nodes,
            transformations=transformations,
            in_place=in_place,
            cache=cache,
            cache_collection=cache_collection,
            **kwargs,
        )
    )
    loop.close()
    return nodes

# In the run method, ensure the ProcessPoolExecutor uses the top-level function
def run(
        self,
        show_progress: bool = False,
        documents: Optional[List[Document]] = None,
        nodes: Optional[List[BaseNode]] = None,
        cache_collection: Optional[str] = None,
        in_place: bool = True,
        store_doc_text: bool = True,
        num_workers: Optional[int] = None,
        **kwargs: Any,
    ) -> Sequence[BaseNode]:
        input_nodes = self._prepare_inputs(documents, nodes)
        if num_workers and num_workers > 1:
            if num_workers > os.cpu_count():
                warnings.warn(
                    "Specified num_workers exceed number of CPUs in the system. "
                    "Setting `num_workers` down to the maximum CPU count."
                )

            loop = asyncio.get_event_loop()
            with ProcessPoolExecutor(max_workers=num_workers) as p:
                node_batches = self._node_batcher(
                    num_batches=num_workers, nodes=nodes_to_run
                )
                tasks = [
                    loop.run_in_executor(
                        p,
                        partial(
                            arun_transformations_wrapper,
                            transformations=self.transformations,
                            in_place=in_place,
                            cache=self.cache if not self.disable_cache else None,
                            cache_collection=cache_collection,
                        ),
                        batch,
                    )
                    for batch in node_batches
                ]
                result: List[List[BaseNode]] = await asyncio.gather(*tasks)
                nodes = reduce(lambda x, y: x + y, result, [])
        else:
            nodes = await arun_transformations(
                nodes_to_run,
                self.transformations,
                show_progress=show_progress,
                cache=self.cache if not self.disable_cache else None,
                cache_collection=cache_collection,
                in_place=in_place,
                **kwargs,
            )

        if self.vector_store is not None:
            await self.vector_store.async_add(
                [n for n in nodes if n.embedding is not None]
            )

        return nodes

# Example usage
num_cores = os.cpu_count()
num_workers = min(4, num_cores)

director_reader = SimpleDirectoryReader(
    input_dir="./data"
)

documents = director_reader.load_data()

pipeline = IngestionPipeline(
    transformations=[
        SentenceWindowNodeParser.from_defaults(
            window_size=6,
            window_metadata_key="window",
            original_text_metadata_key="original_text",
        ),
    ],
)

pipeline.run(documents=documents, show_progress=True, num_workers=num_workers)

By moving the split function and arun_transformations_wrapper to the top level, they become pickleable, and the ProcessPoolExecutor can serialize them correctly [1][2][3][4].

To continue talking to Dosu, mention @dosu.

melya commented 2 weeks ago

Here is a workaround which works for me. Basically I have extracted split function from callable factory, most likely this should be fixed in a "correct" way. I don't know python that well yet.

I would really appreciate if somebody knows how to fix it correctly and can help with this

import os
from typing import List

from llama_index.core import SimpleDirectoryReader
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SentenceWindowNodeParser

num_cores = os.cpu_count()
num_workers = min(4, num_cores)

director_reader = SimpleDirectoryReader(input_dir="./data")
documents = director_reader.load_data()

def split(text: str) -> List[str]:
    import nltk
    tokenizer = nltk.tokenize.PunktSentenceTokenizer()

    spans = list(tokenizer.span_tokenize(text))
    sentences = []
    for i, span in enumerate(spans):
        start = span[0]
        if i < len(spans) - 1:
            end = spans[i + 1][0]
        else:
            end = len(text)
        sentences.append(text[start:end])

    return sentences

pipeline = IngestionPipeline(
    transformations=[
        SentenceWindowNodeParser.from_defaults(
            window_size=6,
            window_metadata_key="window",
            original_text_metadata_key="original_text",
            sentence_splitter=split,
        ),
    ],
)

if __name__ == '__main__':
    pipeline.run(documents=documents, show_progress=True, num_workers=num_workers)
melya commented 2 weeks ago

Same issue when using IngestionCache in pipeline

houtaroy commented 1 day ago

I found the reason

In file 'llama-index-core\llama_index\core\node_parser\text\utils.py', the function split_by_sentence_tokenizer defined an internal function split and return it, but it can't pickle:

def split_by_sentence_tokenizer() -> Callable[[str], List[str]]:
    import nltk

    tokenizer = nltk.tokenize.PunktSentenceTokenizer()

    # get the spans and then return the sentences
    # using the start index of each span
    # instead of using end, use the start of the next span if available
    def split(text: str) -> List[str]:
        spans = list(tokenizer.span_tokenize(text))
        sentences = []
        for i, span in enumerate(spans):
            start = span[0]
            if i < len(spans) - 1:
                end = spans[i + 1][0]
            else:
                end = len(text)
            sentences.append(text[start:end])

        return sentences

    return split

I'm not good at multiprocessing, I try to move tokenizer = nltk.tokenize.PunktSentenceTokenizer() into split, but split_by_sentence_tokenizer and split have different call times, this change will affect performance.

how can we extract split to the top level?