graphnet-team / graphnet

A Deep learning library for neutrino telescopes
https://graphnet-team.github.io/graphnet/
Apache License 2.0
85 stars 86 forks source link

Generalized `DataConverter` #659

Closed RasmusOrsoe closed 5 months ago

RasmusOrsoe commented 5 months ago

This PR refactors DataConverter such that it generalizes to other experiments than icecube, and allows for a very easy implementation of new data backends, i.e it closes #634.

Please note that the many CodeClimate warnings are associated with code that this PR is not introducing. They are mainly concerned with complexity of icecube-related extractors, which we cannot realistically address.

This PR looks huge but it's not.

The main changes are:

Usage Integrating a data source from new experiment requires implementing a new experiment-specific subclass of GraphNeTFileReader, which is fairly straight forward, as it entails writing just a single abstract method. E.g :

from typing import Union, List, OrderedDict

from graphnet.data.readers import GraphNeTFileReader
from graphnet.data.dataclasses import I3FileSet
from graphnet.data.extractors.my_experiment import MyExtractor

class MyCustomReader(GraphNeTFileReader):
    _accepted_file_extensions = ['.my_special_file_type']
    _accepted_extractors = [MyExtractor]

    def __call__(self, file_path: Union[str, I3FileSet]) -> List[OrderedDict]:
        """Open and apply extractors to a single file.

        The `output` must be a list of dictionaries, where the number of events
        in the file `n_events` satisfies `len(output) = n_events`. I.e each
        element in the list is a dictionary, and each field in the dictionary
        is the output of a single extractor.
        """

where MyExtractor is a data-source specific implementation of the generic base class Extractor:

from typing import Any

from graphnet.data.extractors import Extractor

class MyExtractor(Extractor):
    def __init__(self):
        super().__init__(extractor_name = 'MyExtractor')
    def __call__(self, data: Any) -> dict:
        """Extract information from data."""
        raise NotImplementedError

This means that MyCustomReader uniquely defines the input to the corresponding MyExtractor and how they're called; vectorized, serially, etc. The only limitation imposed by us is that the output format of MyCustomReader (see docstring).

Likewise, extending the new DataConverter to output files in a new data backend is easy. It requires only the implementation of a single abstract method:

from typing import Dict
from graphnet.data.writers import GraphNeTWriter

class MyNewWriter(GraphNeTWriter):
     _merge_dataframes = True # or False
     _file_extension = '.my_backend'
    def _save_file(
        self,
        data: Union[Dict[str, pd.DataFrame], Dict[str, List[pd.DataFrame]]],
        output_file_path: str,
        n_events: int,
    ) -> None:
        """Save the interim data format from a single input file.

        Args:
            data: the interim data from a single input file.
            output_file_path: output file path.
            n_events: Number of events container in `data`.
        """
        raise NotImplementedError

the _save_file method receives a dictionary on one of two possible forms, controlled by the class variable _merge_dataframes. If _merge_dataframes= True, _save_file receives a single pd.DataFrame object per extractor that needs to be saved to disk. If _merge_dataframes = False, _save_file receives a pd.DataFrame object per event per extractor. By using the pandas framework for this part, exporting the data to new formats is incredible easy. E.g. a writer that saves each extractor output as separate .csv files would be:

from typing import Dict
import os
from graphnet.data.writers import GraphNeTWriter

class MyCSVWriter(GraphNeTWriter):
    _file_extension = '.csv'
    _merge_dataframes = True
    def _save_file(
        self,
        data: Union[Dict[str, pd.DataFrame], Dict[str, List[pd.DataFrame]]],
        output_file_path: str,
        n_events: int,
    ) -> None:
        """Save the interim data format from a single input file.

        Args:
            data: the interim data from a single input file.
            output_file_path: output file path.
            n_events: Number of events container in `data`.
        """
        output_basepath = os.path.basename(output_file_path)
        for extractor in data.keys():
            out_path = output_basepath  + f'_{extractor}' + self.file_extension
            data[extractor].to_csv(out_path)
RasmusOrsoe commented 5 months ago

@Aske-Rosted thank you for your detailed comments! I ended up refactoring set_gcd to make it clearer what the function actually does. I think your questions were a great hint that this function was too compact and opaque - so I have expanded the docstring and added multiple comments.

It's a little hard to grasp all the details of where stuff has been moved, but I imagine it makes sense once you start using it.

Yes - it does look like a bit much, but it really isn't. You can see here how the new DataConverter - GraphNeTWriter - GraphNeTFileReader combination looks like in practice.

I'm happy to elaborate on any bits of this code @Aske-Rosted