aphp / edsnlp

Modular, fast NLP framework, compatible with Pytorch and spaCy, offering tailored support for French clinical notes.
https://aphp.github.io/edsnlp/
BSD 3-Clause "New" or "Revised" License
113 stars 29 forks source link

Refactor the parallelization utils #212

Closed percevalw closed 11 months ago

percevalw commented 1 year ago

Feature type

Following a brainstorming with @Thomzoy, we'd like to refactor the parallelization utilities to decouple the type of collection (iterators, lists, dataframe pandas, dataframe spark, hive table, etc.) from the type of parallelization (no parallelization, multi cpu, gpu, distributed computing for spark).

Description

Collection types

Most of the processing with edsnlp is done on pandas and spark lists and dataframes (to the best of our knowledge), so we feel it's necessary to handle these cases natively.

The following changes will be made:

How do we plan to support other formats?

It's up to the user to convert the entry into an accepted format. For example, polars to pandas, or polars to dictionary iterator.

Why add two methods to_doc / from_doc, and not simply ask the user to convert everything into an iterator?

  1. Depending on the distribution method, data may not always be accessible locally. For instance, in the case of a spark dataframe, a function is sent to each executor to apply the nlp object to a line. It is therefore necessary to send these conversion functions with the nlp object to each executor, hence these two parameters.
  2. This also allows us to optimize the common but tricky collections (pandas, spark), while giving users some leeway in the choice of columns and outputs.

Acceleration / parallelization mode

We plan to manage acceleration on several processes, one or more gpus, or in a distributed way via spark

Pseudo implementation

This is open to discussion:

def pipe(self, data, from_doc, to_doc, method):
    if is_simple_iterable(data):
        accelerator = get_iterable_accelerator(method)  # various accelerators 
        return accelerator(data, from_doc, to_doc)
    elif is_pandas(data):
        iterable = pandas_to_iterable(data)
        accelerator = get_iterable_accelerator(method)  # various accelerators 
        iterable = accelerator(iterable, from_doc, to_doc)
        return iterable_to_pandas(iterable)
    elif is_polars(data): # we haven't decided yet which formats are common enough to be supported natively
        ...
    elif is_spark(data):
        accelerator = get_spark_accelerator(method)  # will check that the accelerator is compatible
        return accelerator(data, from_doc, to_doc)
    else:
        raise NotImplementedError()
percevalw commented 11 months ago

Change of mind !

The above proposition did not take into account:

The new solution (shipped with v0.10.0 #202) fixes this by introducing the LazyDocsCollection, which records ops lazily and handles document conversion (i.e., tokenization and more) at the same level as pipeline components.