NeuroTechX / moabb

Mother of All BCI Benchmarks
https://neurotechx.github.io/moabb/
BSD 3-Clause "New" or "Revised" License
646 stars 168 forks source link

Improve parallelisation of evaluations #481

Open PierreGtch opened 9 months ago

PierreGtch commented 9 months ago

After discussions at the braindecode code sprint and following up on #460, I think we should break down the evaluations into something like that:

class BaseEvaluation:
    def __init__(
            self,
            ...
            n_nodes=1, # number of data chunks to load in memory in parallel.
            n_jobs=1,  # number of jobs per data chunk. One job fits one pipeline on one fold.
    ):
        self.n_nodes = n_nodes
        self.n_jobs = n_jobs

    @abc.abstractmethod
    def get_splits(self) -> list[dict, list[dict, list[int], list[int]]]:
        """
        Return a list of pairs with:
          * a dict of arguments to pass to self.paradigm.get_data to load a minimal data chunk
          * a list of splits for this data chunk, i.e. triplets with:
            - dict describing the split,
            - list of train indices,
            - list of test indices.
        """
        pass

    def process(self, pipelines):
        splits = self.get_splits()
        splits_todo = []
        for datachunk_args, chunk_splits in splits:
            missing_results = self.results.not_yet_computed(datachunk_args, chunk_splits, pipelines)
            if missing_results:
                splits_todo.append((datachunk_args, chunk_splits, missing_results))
        Parallel(n_jobs=self.n_nodes)(delayed(self.process_datachunk)(pipelines, *args) for args in splits_todo)
        return self.results.to_dataframe(pipelines=pipelines, ...)

    def process_datachunk(self, pipelines, datachunk_args, chunk_splits, missing_results):
        X, y, metadata = self.paradigm.get_data(**datachunk_args)
        Parallel(n_jobs=self.n_jobs)(delayed(self.process_split)(p, X, y, metadata, *split) for split in chunk_splits for p in pipelines)

    def process_split(self, clf, X, y, metadata, split_args, train_idx, test_idx):
        clf = deepcopy(clf)
        clf.fit(X[train_idx], y[train_idx])
        score = clf.score(X[test_idx], y[test_idx])
        self.results.add(datachunk_args, split_args, clf, score)

This would remove all the for loops we have in the different evaluations and allow for larger parallelisation.

bruAristimunha commented 9 months ago

Ping @tomMoral, to join the conversation

tomMoral commented 9 months ago

The proposed pattern couples the code that perform the evaluation (run the code + parallelization) from the process that decide the split. I would recommend to further decouple them, in light of what scikit-learn does, so that the API is similar, making it easy to grasp the various concepts.

Basically, the get_split is serving the same functionality as the BaseCrossValidator object in scikit-learn. The API works with three methods:

Taking back the Evaluation object, you would have a single one I guess, such that:

memory = joblib.Memory(location="__cache__")

class Evaluation:
    def __init__(
            self,
            ...
            n_nodes=1, # number of data chunks to load in memory in parallel.
            n_jobs=1,  # number of jobs per data chunk. One job fits one pipeline on one fold.
            cv="intersubject",

    ):
        self.n_nodes = n_nodes
        self.n_jobs = n_jobs
        if isinstance(cv, str): # make it easy if you want default parameters for cv
            cv = CV_CLASSES[cv]()
        self.cv= cv

    def process(self, pipelines, datasets):
        results = Parallel(n_jobs=self.n_jobs)(
            delayed(self.process_split)(p, d, metadata, train_idx, test_idx)
            for p in pipelines for d in datasets
            for (train_idx, test_idx) in self.cv.split(d)
        )
        return pd.DataFrame(results)

    @memory.cache
    def process_split(self, clf, dataset, metadata, split_args, train_idx, test_idx):
        clf = deepcopy(clf)
        X_train, X_test, y_train, y_test, metadata = self.paradigm.get_data(
            **datachunk_args, train_idx, test_idx
        )
        clf.fit(X_train, y_train)
        score = clf.score(X_test, y_test)
        return {'metadata': datachunk_args, 'clf': clf, 'score': score}

Note that I changed the manual caching to use joblib.Memory which is done for caching calls a a function and I flattened the parallelism (joblib is bad with nested parallelism).

PierreGtch commented 9 months ago

Thanks @tomMoral for your feedback!! But not sure if this would completely work because have some quite specific constraints:

This is why I proposed this nested parallelism. Maybe an in-between would be to implement BaseCrossValidators but that would receive only the data of one subject as input instead of a whole dataset?