SCECcode / pycsep

Tools to help earthquake forecast model developers build and evaluate their forecasts
https://docs.cseptesting.org
BSD 3-Clause "New" or "Revised" License
50 stars 24 forks source link

optimize catalog processing #179

Closed wsavran closed 2 years ago

wsavran commented 2 years ago

@mherrmann3 thread for an open discussion on optimizing the catalog-based evaluations. problem is embarrassingly parallel and slow, so there is obvious motivation to speed up the calculations.

some possible packages here to help with that: numba, joblib, dask

i will edit this issue over time with more information.

mherrmann3 commented 2 years ago

Alright, let's dig a little deeper into this issue....

Regarding numba, I once gained a significant speedup by numba-fying bin1d_vec—because I needed it repeatedly to get indices of target events for >6000 time windows. Now I wanted to report the speed improvement here... and I was surprised: it doesn't make any difference anymore: pycsep's bin1d_vec is as fast as my numba-fied one)! My best guess is that numpy v1.20.0 introduced some significant performance improvements by leveraging SIMD (i.e. array/vector processors), which speeds up ufuncs and makes numba useless here; apparently (and indeed evidently), bin1d_vec makes considerable use of such universal functions—independently of your update one year ago. Did you happen to notice a faster execution of catalog evaluations after ~Jan 2021 when updating to numpy ≥1.20.0?

I'll do some profiling during catalog evaluations to find bottlenecks that could potentially be numba-fied. But I doubt it, as most operations are already on vectors/arrays.

Alternatively or additionally, I can work on parallelizing catalog evaluations. As you mentioned, it's an the embarrassingly parallel problem, so joblib will make that very easy. It would be a pity not to take advantage of the ubiquitous multi-core processors. If this is not fast enough, and if we have a cluster available, we can also think about Dask. Just comes to my mind: could we have a config file or similar (cmd args) to specify the number of thread/processes that pycsep can use—or should we assume that using all available cores is fine?

wsavran commented 2 years ago

@mherrmann3 I did notice similar things with numba, but I wasn't able to pin it directly to updates numpy ≥1.20.0. It makes sense given the time of that update though. I'm interested to see what we can get out of joblib, because I had ran into some limitations with Python. Granted, I tried to optimize these calculations back in 2018 before the shared memory support was released in Python 3.8. I think we should work to optimize the catalog evaluations first, and then optimize the lower level calculations. See below for more.

Idea for parallelization

  1. Use forecast class iterator to read / filter forecast on reader process
  2. Serialize memmapped catalog data to multiprocessing queue
  3. Worker process consumes the memmap from the queue and processes catalog
    • Some evaluations require that gridded rates are computed
    • Multiple statistics may need to be computed for same catalog (eg., user wants N- M- and S- tests).
  4. Return statistics to manager (main) process

Note: User would have to define evaluation set so that we can limit the I/O operations for catalog forecasts. See the comments below for more details.

Explanation

I'll try and summarize some of the issues that I had in case they might help. The main issue involved the process-based parallelism that Python implements. Some of this was related to the way that UCERF3 stores forecasts in single large binary file, but I think the issues are still relevant. Effectively, we want to limited the amount of shared resources and serialization calls being made by joblib or multiprocessing. I was able to overcome this by using an approach similar to the manual management of memmaped data. I would read in the UCERF3 forecast and have each process use a memmap to access the individual catalogs without sharing the data across each process. I used the following classes for that:

class ArrayView:
    def __init__(self, array, max_bytes, dtype, el_shape, i_item=0):
        self.dtype = dtype
        self.el_shape = el_shape
        self.nbytes_el = self.dtype.itemsize * np.product(self.el_shape)
        self.n_items = int(np.floor(max_bytes / self.nbytes_el))
        self.total_shape = (self.n_items,) + self.el_shape
        self.i_item = i_item
        self.view = np.frombuffer(array, dtype, np.product(self.total_shape)).reshape(self.total_shape)

    def __eq__(self, other):
        """Overrides the default implementation"""
        if isinstance(self, other.__class__):
            return self.el_shape == other.el_shape and self.dtype == other.dtype
        return False

    def push(self, element):
        self.view[self.i_item, ...] = element
        i_inserted = self.i_item
        self.i_item = (self.i_item + 1) % self.n_items
        return self.dtype, self.el_shape, i_inserted # a tuple is returned to maximise performance

    def pop(self, i_item):
        return self.view[i_item, ...]

    def fits(self, item):
        if isinstance(item, np.ndarray):
            return item.dtype == self.dtype and item.shape == self.el_shape
        return (item[0] == self.dtype and
                item[1] == self.el_shape and
                item[2] < self.n_items)

class ArrayQueue:
    """ A drop-in replacement for the multiprocessing queue, usable
     only for numpy arrays, which removes the need for pickling and
     should provide higher speeds and lower memory usage
    """
    def __init__(self, max_mbytes=10):
        self.maxbytes = int(max_mbytes*1000000)
        self.array = Array('c', self.maxbytes)
        self.view = None
        self.queue = Queue()
        self.read_queue = Queue()
        self.last_item = 0

    def check_full(self):
        while True:
            try:
                self.last_item = self.read_queue.get(timeout=0.00001)
            except Empty:
                break
        if self.view.i_item == self.last_item:
            raise Full("Queue of length {} full when trying to insert {},"
                       " last item read was {}".format(self.view.n_items,
                                                       self.view.i_item, self.last_item))

    def put(self, element):
        if self.view is None or not self.view.fits(element):
            self.view = ArrayView(self.array.get_obj(), self.maxbytes,
                                  element.dtype, element.shape)
            self.last_item = 0
        else:
            self.check_full()
        qitem = self.view.push(element)

        self.queue.put(qitem)

    def get(self, **kwargs):
        aritem = self.queue.get(**kwargs)
        if self.view is None or not self.view.fits(aritem):
            self.view = ArrayView(self.array.get_obj(), self.maxbytes,
                                  *aritem)
        self.read_queue.put(aritem[2])
        return self.view.pop(aritem[2])

    def clear(self):
        """ Empties the queue without the need to read all the existing
        elements
        :return: nothing
        """
        self.view = None
        while True:
            try:
                it = self.queue.get_nowait()
            except Empty:
                break
        while True:
            try:
                it = self.read_queue.get_nowait()
            except Empty:
                break

        self.last_item = 0

    def empty(self):
        return self.queue.empty()

Right now the for loops in the catalog evaluation functions incorporate a generator function that loads catalogs on demand. Filters are automatically applied during this process, which can be quite slow in the case of the time-dependent magnitude completeness model. I don't think this current approach will easily work with the multiprocessing module, because the generator function is not thread-safe. The typical way for this parallelize would be to have a single process that iterates through the forecast writes the numpy.array holding the catalog data to a queue, while worker processes consume this data. Upon completion they would send results to another process for aggregation. Additionally, some evaluations like the S and PL-tests require that the forecast be first processed to computed the expected rate densities in each cell.

Another cause of poor performance occurs when calling multiple evaluations that all introduce I/O overhead. This can be eliminated if the forecasts are stored in memory, but in the case of some UCERF3 forecasts they are simply too large to store in the memory on a personal computer. Therefore, any solution for parallelization should still support lazy-loading and reduce the I/O requirements.

We can create a module that allows for evaluation sets to be defined by the user. The user would define which evaluations to perform on the forecast, and instead of computing the statistics sequentially, each statistic would be computed while the catalog was in memory. The worker processes would report back the individual statistics to the manager process. We would still be using a reader process to read in the catalogs from memory. Another thing to consider is that numpy is performing many implicit loops over the catalog data when calling many of the built-in functions. Maybe we just write our own C/C++ module for the processing (I'm only sort of kidding)? However, cython could be an option.

Ideally, we would read in the catalog once, iterate over the catalog once, and return each statistic for the catalog. Each process could either be responsible for a single statistic or for a single catalog and return multiple statistics. I think, we would want to supply this in addition to the current evaluations and not replace them. Perhaps something along the lines of a parallel module that would allow the user to call the parallel version of the function.

wsavran commented 2 years ago

Here are some examples of the classes I was mentioning in the last post.

class AbstractProcessingTask:
    def __init__(self, data=None, name=None, min_mw=2.5, n_cat=None, mws=None):
        self.data = data or []
        # to-be deprecated
        self.mws = mws or [2.5, 3.0, 3.5, 4.0, 4.5]
        self.min_mw = min_mw
        self.n_cat = n_cat
        self.name = name
        self.ax = []
        self.fnames = []
        self.needs_two_passes = False
        self.buffer = []
        self.region = None
        self.buffer_fname = None
        self.fhandle = None
        self.archive = True
        self.version = 1

    @staticmethod
    def _build_filename(dir, mw, plot_id):
        basename = f"{plot_id}_mw_{str(mw).replace('.','p')}".lower()
        return os.path.join(dir, basename)

    def process(self, data):
        raise NotImplementedError('must implement process()!')

    def process_again(self, catalog, args=()):
        """ This function defaults to pass unless the method needs to read through the data twice. """
        pass

    def post_process(self, obs, args=None):
        """
        Compute evaluation of data stored in self.data.

        Args:
            obs (csep.Catalog): used to evaluate the forecast
            args (tuple): args for this function

        Returns:
            result (csep.core.evaluations.EvaluationResult):

        """
        result = EvaluationResult()
        return result

    def plot(self, results, plot_dir, show=False):
        """
        plots function, typically just a wrapper to function in utils.plotting()

        Args:
            show (bool): show plot, if plotting multiple, just run on last.
            filename (str): where to save the file
            plot_args (dict): plotting args to pass to function

        Returns:
            axes (matplotlib.axes)

        """
        raise NotImplementedError('must implement plot()!')

    def store_results(self, results, dir):
        """
        Saves evaluation results serialized into json format. This format is used to recreate the results class which
        can then be plotted if desired. The following directory structure will be created:

        | dir
        |-- n-test
        |---- n-test_mw_2.5.json
        |---- n_test_mw_3.0.json
        |-- m-test
        |---- m_test_mw_2.5.json
        |---- m_test_mw_3.0.json
        ...

        The results iterable should only contain results for a single evaluation. Typically they would contain different
        minimum magnitudes.

        Args:
            results (Iterable of EvaluationResult): iterable object containing evaluation results. this could be a list or tuple of lists as well
            dir (str): directory to store the testing results. name will be constructed programatically.

        Returns:
            None

        """
        success = False
        if self.archive == False:
            return

        # handle if results is just a single result
        if isinstance(results, EvaluationResult):
            repo = FileSystem(url=self._build_filename(dir, results.min_mw, results.name) + '.json')
            if repo.save(results.to_dict()):
                success = True
            return success
        # or if its an iterable
        for idx in seq_iter(results):
            # for debugging
            if isinstance(results[idx], tuple) or isinstance(results[idx], list):
                result = results[idx]
            else:
                result = [results[idx]]
            for r in result:
                repo = FileSystem(url=self._build_filename(dir, r.min_mw, r.name) + '.json')
                if repo.save(r.to_dict()):
                    success = True
        return success

    def store_data(self, dir):
        """ Store the intermediate data used to calculate the results for the evaluations. """
        raise NotImplementedError

class NumberTest(AbstractProcessingTask):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.mws = [2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 5.5, 6.0]

    def process(self, catalog, filter=False):
        if not self.name:
            self.name = catalog.name
        counts = []
        for mw in self.mws:
            cat_filt = catalog.filter(f'magnitude >= {mw}')
            counts.append(cat_filt.event_count)
        self.data.append(counts)

    def post_process(self, obs, args=None):
        # we dont need args for this function
        _ = args
        results = {}
        data = numpy.array(self.data)
        for i, mw in enumerate(self.mws):
            obs_filt = obs.filter(f'magnitude >= {mw}', in_place=False)
            observation_count = obs_filt.event_count
            # get delta_1 and delta_2 values
            delta_1, delta_2 = get_quantiles(data[:,i], observation_count)
            # prepare result
            result = EvaluationResult(test_distribution=data[:,i],
                          name='N-Test',
                          observed_statistic=observation_count,
                          quantile=(delta_1, delta_2),
                          status='Normal',
                          obs_catalog_repr=obs.date_accessed,
                          sim_name=self.name,
                          min_mw=mw,
                          obs_name=obs.name)
            results[mw] = result
        return results

    def plot(self, results, plot_dir, plot_args=None, show=False):

        for mw, result in results.items():
            # compute bin counts, this one is special because of integer values
            td = result.test_distribution
            min_bin, max_bin = numpy.min(td), numpy.max(td)
            # hard-code some logic for bin size
            bins = numpy.arange(min_bin, max_bin)
            if len(bins) == 1:
                bins = 3
            n_test_fname = AbstractProcessingTask._build_filename(plot_dir, mw, 'n_test')
            _ = plot_number_test(result, show=show, plot_args={'percentile': 95,
                                                                'title': f'Number Test, M{mw}+',
                                                                'bins': bins,
                                                                'filename': n_test_fname})
            self.fnames.append(n_test_fname)
wsavran commented 2 years ago

@mherrmann3 im going to close this issue for now, since we aren't actively working on the optimizations. we can easily re-open if that becomes a priority.