MDAnalysis / pmda

Parallel algorithms for MDAnalysis
https://www.mdanalysis.org/pmda/
Other
31 stars 22 forks source link

replace res with two kinds of output, accumulator and timeseries #55

Closed VOD555 closed 6 years ago

VOD555 commented 6 years ago

Changes made in this Pull Request:

The example for timeseries is contact.py, and the one for accumulator is rdf.

PR Checklist

kain88-de commented 6 years ago

I do not understand the motivation for this change. In what respect is the size of results reduced? It looks to me like it would make it more complicated to write an analysis because one has to understand and decide if they want a timeseries or an accumulator.

Note I'm not against this change and I appreciate your contribution. I would simply like to understand it before I merge.

VOD555 commented 6 years ago

@kain88-de Currently, res stores the results from each step. It's ok for functions such as contacts and rms, whose final results are timeseries. However, the final result of pmda.rdf is not tiemseries, if we want to get a 75-bins rdf for a system with 1000 frames using 4 cores, in each step, pmda.rdf will generate a 175 matrix, and self._results would be a 425075 matrix. But what we want is just a 175 matrix. For a 'numpy.array' whose elements are also numpy.array, for example, 'a = np.array(np.array([b, c, d]), np.array([e]))', the addtion a + a will be np.array(np.array([2b, 2c, 2d]), np.array([2e])) but not a.append(a). Due to this property, in each parallelled block, accumulator will do matrix addtion, and it is always a 175 matrix. As a result, self._results would be a 475 matrix which is much smaller.

orbeckst commented 6 years ago

As @VOD555 says, there are two different ways in which one might want to collect data: as a timeseries or as a histogram (accumulator). In principle one could create the timeseries and then histogram in _conclude but this typically requires too much memory. RDF or a density calculation are examples of the latter. We haven't found a good way to use the existing PMDA framework for histograming, therefore this experimental PR.

If you have an alternative suggestion we're all ears – one reason for this PR is to discuss and prototype.

kain88-de commented 6 years ago

OK, I do understand the intent better now. You are saying we have the wrong reduce step in our map-reduce scheme. In the timeseries case we want to stack arrays together as the reduce step, or simply collecting all results in the right order. For the accumulation, we want to add arrays.

The current reduce step is res.append(computation). We would simply have to change this reduce function based on our cases.

timeseries: res = np.stack(res, computation) accumulator: res = np.sum(res, computation)

To implement this the ParallelAnalysisBase class would get a new attribute. reduce = np.stack. This attribute can be overwritten in a child class like RDF to reduce = np.sum. Such an attribute would allow full control over the reduce-step in the analysis and requires no change in existing classes. With this new attribute line https://github.com/MDAnalysis/pmda/blob/a5aa57607bc413496322d97bbc22e8eb64884cba/pmda/parallel.py#L325 would change to

res = self.reduce(res, self._single_frame(ts, agroups))

Note I'm not sure about the stack command. One should check this! I'm sure I missed some details in implementing this but the general idea could work.

edit: updated code example to make it clearer

orbeckst commented 6 years ago

Kind of. I would say that conceptually we do the reduce of map-reduce in _conclude() when the individual results from the workers are combined. But we are also doing a reduction over the individual frames for each worker, and that is the main problem here. I am drawing the distinction because it is possible that the reduction in _conclude could be different from the one over frames in a single worker. For example, if we were to calculate densities d[i] in the workers then the final reduction would need to add them with weights, d = w[0]*d[0] + w[1]*d[1] + w[2]*d[2] + ... where w[i] = n_frames[i]/n_frames_total.

I like your idea of defining a reduce function. I would actually be explicit and add a new API method

class ParallelAnalysis():
   ...
   @staticmethod
   def _reduce(*args):
         """implement the reduction of a single frame"""
         # timeseries:
         return np.stack(args)

         # accumulator
         return np.sum(args)

and then use self._reduce() in the _dask_helper() function

res = self._reduce(res, self._single_frame(ts, agroups))

(I would make it a static method to make clear that it should only work on the input and not use anything in the class.)

If the _reduce() method works for _conclude() then you can also use it there directly:

def _conclude(self):
    self.result = self._reduce(*self._result)
kain88-de commented 6 years ago

I would make it a static method to make clear that it should only work on the input and not use anything in the class.

Good idea!

I like your idea of defining a reduce function. I would actually be explicit and add a new API method

Yes being explicit is good. We should then better document the scheme we currently use for parallelization.

richardjgowers commented 6 years ago

Maybe we just need to have a mixin which define what sort of reduce happens...

class SomethingOverTime(ParallelBase, AppendReduce):

class AverageThing(ParallelBase, MeanReduce):

orbeckst commented 6 years ago

Just to clarify: my idea was that when you write a class, you write your own

because both can be rather specific.

We should have a default that maintains the old "timeseries" behavior. Maybe something like the following

class ParallelAnalysis():
   ...
   @staticmethod
   def _reduce(*args):
         """ 'append' action for a time series

        Note: all args *must* be of type list so that they create one bigger list.

        The alternative would be to do something like

            return args[0].extend(args[1:])

         """
         return sum(args)

(I think np.hstack is not flexible enough.)

VOD555 commented 6 years ago

What `np.stack()' does is to Join a sequence of arrays along a new axis. One example for it is

>>> a = np.array([1, 2, 3])
>>> b = np.array([2, 3, 4])
>>> np.stack((a, b))
array([[1, 2, 3],
       [2, 3, 4]])

The input should be one array, and np.stack() cannot deal with dictionary (such as the single_frame output of pmda.rdf). So I think np.stack() is not flexible enough.

list.append is still a good choice for us, and the default self._reduce() for 'timeseries' may be like

class ParallelAnalysis():
    ...
    def _reduce(self, res, result_single_frame):
        res.append(result_single_frame)
        return res

I've tested this, and it works well with the functions we currently have.

kain88-de commented 6 years ago

Maybe we just need to have a mixin which define what sort of reduce happens...

Using staticmethods or inheritance through mixin classes achieves the same goal. Either way is OK. Personally I prefer to use a staticmethod because I can test them easily and having special one-off solutions are easier to implement.

For the staticmethod solution the standard should be append and we should over an accumulate utility function that can be used by others. In RDF this would be

@staticmethod
def _reduce(a, b):
    return parallel.accumulate(a, b)

The accumulate function can be what @VOD555 has currently written.

Using a staticmethod goes more in the direction of using composition instead of inheritance. There are lots of arguments on the internet available for composition and against it. I personally like it better.

codecov[bot] commented 6 years ago

Codecov Report

Merging #55 into master will decrease coverage by 1.79%. The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master      #55     +/-   ##
=========================================
- Coverage   99.25%   97.46%   -1.8%     
=========================================
  Files           7        7             
  Lines         270      276      +6     
  Branches       28       27      -1     
=========================================
+ Hits          268      269      +1     
- Misses          1        4      +3     
- Partials        1        3      +2
Impacted Files Coverage Δ
pmda/rdf.py 90.9% <100%> (-9.1%) :arrow_down:
pmda/parallel.py 100% <100%> (ø) :arrow_up:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update a5aa576...06e6f7b. Read the comment docs.

kain88-de commented 6 years ago

@VOD555 thanks for the fixes! Do you mind adding documentation here for the new _reduce static method and explain it's purpose, default and optional requirement for a class inheriting from ParallelAnalysisBase?

VOD555 commented 6 years ago

@VOD555 thanks for the fixes! Do you mind adding documentation here for the new _reduce static method and explain it's purpose, default and optional requirement for a class inheriting from ParallelAnalysisBase?

Sure!

kain88-de commented 6 years ago

@VOD555 I made my suggested changes myself. You can have a last look over them before merging.

VOD555 commented 6 years ago

@kain88-de That's very good, thanks.