run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
35.26k stars 4.96k forks source link

[Bug]: Unexpected keyword argument for run_transformations() method #14809

Open FlorentGrenier opened 1 month ago

FlorentGrenier commented 1 month ago

Bug Description

In the run() method (line 542 in llama_index.core.ingestion.pipeline.py) the parameter show_progress is passed to the run_transformation() method, but this method doesn't take this parameter.

Instantiate the run_transformations() method in the run() method: :

nodes = run_transformations(
  nodes_to_run,
  self.transformations,
  show_progress=show_progress,
  cache=self.cache if not self.disable_cache else None,
  cache_collection=cache_collection,
  in_place=in_place,
  **kwargs,
)

run_transformations() method signature :

def run_transformations(
    nodes: List[BaseNode],
    transformations: Sequence[TransformComponent],
    in_place: bool = True,
    cache: Optional[IngestionCache] = None,
    cache_collection: Optional[str] = None,
    **kwargs: Any,
) -> List[BaseNode]:

Version

0.10.55

Steps to Reproduce

Run a pipeline ingestion

My code :

pipeline = IngestionPipeline(
    transformations=[
        sentence_split,
        text_cleaner,
        self.embed_model
    ],
    vector_store=self.vector_store,
    docstore=self.docstore
)
pipeline.run(documents=documents)

Relevant Logs/Tracbacks

Traceback (most recent call last):
  File "C:\data\git\.......\test.py", line 110, in <module>
    data_ingestion.main()
  File "C:\data\git\.......\test.py", line 105, in main
    self.create_index_local()
  File "C:\data\git\.......\test.py", line 83, in create_index_local
    nodes = self.run_pipeline(documents)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\data\git\.......\test.py", line 101, in run_pipeline
    nodes = pipeline.run(documents=documents)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 230, in wrapper      
    result = func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^
            ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\ingestion\pipeline.py", line 122, in run_transformations  
    nodes = transform(nodes, **kwargs)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 221, in wrapper      
    bound_args = inspect.signature(func).bind(*args, **kwargs)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\inspect.py", line 3242, in bind
    return self._bind(args, kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\inspect.py", line 3231, in _bind
    raise TypeError(
TypeError: got an unexpected keyword argument 'show_progress'
dosubot[bot] commented 1 month ago

To resolve the TypeError caused by the show_progress parameter being passed to the run_transformations() method, you need to remove the show_progress parameter from the call to run_transformations() in the run method of the IngestionPipeline class.

Here is the relevant part of the run method with the necessary modification:

def run(
        self,
        show_progress: bool = False,
        documents: Optional[List[Document]] = None,
        nodes: Optional[List[BaseNode]] = None,
        cache_collection: Optional[str] = None,
        in_place: bool = True,
        store_doc_text: bool = True,
        num_workers: Optional[int] = None,
        **kwargs: Any,
    ) -> Sequence[BaseNode]:
        """
        Args:
            show_progress (bool, optional): Shows execution progress bar(s). Defaults to False.
            documents (Optional[List[Document]], optional): Set of documents to be transformed. Defaults to None.
            nodes (Optional[List[BaseNode]], optional): Set of nodes to be transformed. Defaults to None.
            cache_collection (Optional[str], optional): Cache for transformations. Defaults to None.
            in_place (bool, optional): Whether transformations creates a new list for transformed nodes or modifies the
                array passed to `run_transformations`. Defaults to True.
            num_workers (Optional[int], optional): The number of parallel processes to use.
                If set to None, then sequential compute is used. Defaults to None.

        Returns:
            Sequence[BaseNode]: The set of transformed Nodes/Documents
        """
        input_nodes = self._prepare_inputs(documents, nodes)
        # check if we need to dedup
        if self.docstore is not None and self.vector_store is not None:
            if self.docstore_strategy in (
                DocstoreStrategy.UPSERTS,
                DocstoreStrategy.UPSERTS_AND_DELETE,
            ):
                nodes_to_run = self._handle_upserts(
                    input_nodes, store_doc_text=store_doc_text
                )
            elif self.docstore_strategy == DocstoreStrategy.DUPLICATES_ONLY:
                nodes_to_run = self._handle_duplicates(
                    input_nodes, store_doc_text=store_doc_text
                )
            else:
                raise ValueError(f"Invalid docstore strategy: {self.docstore_strategy}")
        elif self.docstore is not None and self.vector_store is None:
            if self.docstore_strategy == DocstoreStrategy.UPSERTS:
                print(
                    "Docstore strategy set to upserts, but no vector store. "
                    "Switching to duplicates_only strategy."
                )
                self.docstore_strategy = DocstoreStrategy.DUPLICATES_ONLY
            elif self.docstore_strategy == DocstoreStrategy.UPSERTS_AND_DELETE:
                print(
                    "Docstore strategy set to upserts and delete, but no vector store. "
                    "Switching to duplicates_only strategy."
                )
                self.docstore_strategy = DocstoreStrategy.DUPLICATES_ONLY
            nodes_to_run = self._handle_duplicates(
                input_nodes, store_doc_text=store_doc_text
            )

        else:
            nodes_to_run = input_nodes

        if num_workers and num_workers > 1:
            if num_workers > multiprocessing.cpu_count():
                warnings.warn(
                    "Specified num_workers exceed number of CPUs in the system. "
                    "Setting `num_workers` down to the maximum CPU count."
                )
            with multiprocessing.get_context("spawn").Pool(num_workers) as p:
                node_batches = self._node_batcher(
                    num_batches=num_workers, nodes=nodes_to_run
                )
                nodes_parallel = p.starmap(
                    run_transformations,
                    zip(
                        node_batches,
                        repeat(self.transformations),
                        repeat(in_place),
                        repeat(self.cache if not self.disable_cache else None),
                        repeat(cache_collection),
                    ),
                )
                nodes = reduce(lambda x, y: x + y, nodes_parallel, [])
        else:
            nodes = run_transformations(
                nodes_to_run,
                self.transformations,
                cache=self.cache if not self.disable_cache else None,
                cache_collection=cache_collection,
                in_place=in_place,
                **kwargs,
            )

        if self.vector_store is not None:
            self.vector_store.add([n for n in nodes if n.embedding is not None])

        return nodes

By removing the show_progress parameter from the call to run_transformations, you ensure that only the accepted parameters are passed, thus resolving the TypeError [1][2].

To continue talking to Dosu, mention @dosu.

skvrd commented 1 month ago

The show_process ends up in **kwargs: Any.

It seems like one of the transformations is not accepting the show_progress.

Which sentence_split, text_cleaner are you using? Could you provide class names, or some code?

FlorentGrenier commented 1 month ago

Yes, my code :

def run_pipeline(self, documents: List[Document]) -> Sequence[BaseNode]:
  sentence_split = SentenceSplitter(chunk_size=Configs.chunk_size, chunk_overlap=Configs.chunk_overlap)
  text_cleaner = TextCleaner()

  pipeline = IngestionPipeline(
      transformations=[
          sentence_split,
          text_cleaner,
          self.embed_model
      ]
  )
  nodes = pipeline.run(documents=documents)
  return nodes

class TextCleaner(TransformComponent):
    def __init__(self):
        self.nlp = spacy.load(Configs.spacy_language_fr)
        self.stop_words = set(stopwords.words('french') + stopwords.words('english'))
        self.punctuations = set(['.', ',', '/', '!', '?', ';', ':', '(', ')', '[', ']', '-', '_', '%'])

    def __call__(self, nodes):
        for node in nodes:
            node.text = self.clean_text(node.text)
        return nodes

    def clean_text(self, text: str) -> str:
        doc = self.nlp(text.lower())
        keywords = [token.lemma_ for token in doc if token.text not in self.stop_words and token.text not in self.punctuations and not token.is_digit and len(token.text) > 1]
        clean_data = ' '.join(keywords)
        return clean_data

I don't think the error comes from my code, but from the llama_index library, because in the run() method when run_transformation() is instantiated, it tries to pass show_progress.

logan-markewich commented 2 weeks ago

Pretty sure you should have this in your code

 def __call__(self, nodes, **kwargs: Any):
        for node in nodes:
            node.text = self.clean_text(node.text)
        return nodes