MDAnalysis / pmda

Parallel algorithms for MDAnalysis
https://www.mdanalysis.org/pmda/
Other
31 stars 22 forks source link

Parallel implementation not leveraging the lightweight serializations/de-serializations of MDA #26

Open mnmelo opened 6 years ago

mnmelo commented 6 years ago

Right now parallel analyses do their own sort of pickling/unpickling. While this gives the analysis more control and possibly skips unneeded pickle/unpickle cruft, it has a number of drawbacks:

I vote we rely on the Universe's and AtomGroup's pickling/unpickling behavior. If we could do that then making a parallel analysis would become an automated wrapper and the user need not subclass ParallelAnalysisBase themselves.

I understand that as it is right now it can't work as I want, because we must explicitly tell the analysis that it should pickle along the Universe (otherwise AtomGroup unpickling will fail). We never got around to implementing auto-loading of a Universe when unpickling an AtomGroup (https://github.com/MDAnalysis/mdanalysis/issues/878). At the time it seemed selfishly premature of me, since that would only address my own parallelization needs. Now that this becomes more widespread it might be good to revisit the idea, what do you think?

kain88-de commented 6 years ago

Subclasses must be explicitly written to make an analysis parallel. This makes automated parallelization of existing analyses hard;

I haven't gotten around to port AnalysisFromClass to the this module. It should make porting existing things easier. But we for sure do need to rewrite all analysis classes to make use of this. There isn't a good way around this. I'm also not for keeping the current AnalysisClass as it is in MDAnalysis since the API is a bit restrictive and makes it to easy to introduce subtle parallelism bugs.

Each parallel analysis must implement its own pickling/unpickling strategy, leading to code duplication among analyses and compared to the existing pickling/unpickling strategy;

Why where is this done? All the 'unpickling' is done in the base class. Using the comments of @richardjgowers in #13 it shouldn't be to hard to actually use atomgroup pickling in the base class. I haven't tried it yet though. The only unpickling that needs to be done in a derived class is in _conclude, but this is also the case in MDAnalysis currently to prepare class arguments that are easier to work with for the user then arguments we might use internally.

I vote we rely on the Universe's and AtomGroup's pickling/unpickling behavior. If we could do that then making a parallel analysis would become an automated wrapper and the user need not subclass ParallelAnalysisBase themselves.

How would something like that look like? What would be a user API?

I understand that as it is right now it can't work as I want,

How do you want it to work? If you give us some example code that demonstrates what you want to do a discussion will be easier.

mnmelo commented 6 years ago

I think it is a bit premature to decide that AnalysisBase subclasses will need a user-written wrapper to be compatible at all with automated parallelism. Of course, some algorithms aren't readily parallelizable, so I also don't mean to say that all AnalysisBase subclasses must be parallelization-compatible. Also, what I propose below would need some adaptation of the AnalysisBase API, which I guess reflects (some) API limitations you mention and also forces users to update their analyses.

But as you say, examples are clearer. For instance, I'd like this to work:

from MDAnalysis.analysis.rms import RMSD
from pmda.parallel import parallelize

ParallelRMSD = parallelize(RMSD)
rmsd = ParallelRMSD(u.atoms).run().rmsd
# Other APIs are possible. The idea is to streamline parallelization.

where parallelize would be something like:

def parallelize(analysis):
    class p_analysis(ParallelAnalysisBase, analysis):
        pass
    return p_analysis

and ParallelAnalysisBase would provide necessary wrapping to invoke dask (adapted from the current ParallelAnalysisBase):

class ParallelAnalysisBase(object):
    def run():
        #calculate self.start, self.stop, self.step, self.bsize, self.n_frames ...
        self._prepare()
        blocks = []
        for b in range(n_blocks):
            # Note the simpler invocation. Dask serializes 'self', so no need
            # to pass all that extra data.
            task = delayed(self._dask_helper, pure=False)(b)
            blocks.append(task)
        blocks = delayed(blocks)
        # ...
        # result collection
        # ...
        self._conclude()

    def _dask_helper(self, block_id):
        # calculate the frames to iterate over from block_id;
        # iterate;
        # ...
        return result

Note that in this implementation the AtomGroups/Universes/Readers associated with the analysis all are preserved when calling _dask_helper.

Now, I see two main things needed for this to work:

  1. AtomGroup/Universe/Reader must be easily serializable and de-serializable. Plugging into dask will just work, then.
  2. The AnalysisBase subclass must have some information about the kind of result structure it generates, so that result reporting by _dask_helper, and subsequent collection in run recreate it properly. This is where the Analysis API can be made friendlier. Alternatively the user could implement a _combine method for an Analysis, with code to combine analysis objects. _dask_helper would then just return self, and run would _combine everything.

What do you think about this? Did I overlook other cases where this also fails?

kain88-de commented 6 years ago

But as you say, examples are clearer. For instance, I'd like this to work:

from MDAnalysis.analysis.rms import RMSD
from pmda.parallel import parallelize

ParallelRMSD = parallelize(RMSD)
rmsd = ParallelRMSD(u.atoms).run().rmsd
# Other APIs are possible. The idea is to streamline parallelization.

This is not possible currently and I really do not like it as an API.

why is this not possible: Our BaseAnalysis in mda is manages all iterations over state. The single_frame method doesn't have any arguments and all variables are passed to it with the self object. This means the frames have to be computed in order (this is a implementation detail). We would have to change all analysis classes in mda to a different API.

Why is this not a good API: Mainly discover-ability. Just using the RMSD class from MDAnalysis it's not clear from the function signatures one can run it in parallel. This also makes it hard to explain to a new user. From other packages it is silent convention to have a n_jobs keyword arg.

In short this API is harder to get working correct and it's harder to use.

I went specifically for a new package so we could get rid of a lot of crud that accumulated in MDAnalysis and we keep it around for backward compatibility. Our BaseAnalysis class in mda is already pretty good which is why a dask version looks so similar. But it has small API warts that make it a pain to add dask in mda itself. The idea of having a new package is also to allow us to break an API and be open that we use this to test out different things. (Once something is released in MDA itself I really would not like to change it to avoid breaking user code). Ideally we can merge this back into MDAnalysis once we know everything is working correct and the API is stable.

We can do some result combination in run which I currently leave to _conclude. That would make some implementations easier. It's still left though for a user to untangle the self._result into meaningful attributes.

mnmelo commented 6 years ago

This means the frames have to be computed in order (this is a implementation detail)

I disagree. The frame computation order is decided by the run method. This is easily overridable with a parallelization-aware version like the one I posted above.

Why is this not a good API: Mainly discover-ability.

I don't think discoverability weighs that much more than simplicity. Anyway, this is a good argument for implementing a result combination method (_combine, above). Analyses that have that method can be parallelized. Those that don't, can't.

Our BaseAnalysis class in mda is already pretty good which is why a dask version looks so similar. But it has small API warts that make it a pain to add dask in mda itself.

Well, the API is young and if you think there are warts we can and should fix them. Still, with what I propose we keep the current AnalysisBase API, extend it for devs that want to make their Analyses parallel, and that's it.

It's still left though for a user to untangle the self._result into meaningful attributes.

I feel this weighs much more than any of the aspects we discussed so far, and we should definitely try to pass on this burden to the devs' shoulders (again, a _combine method suited to each analysis).

This said, your effort is awesome, and something that I never got around to doing after MDreader. I'll try to write some code to showcase my point, but since I can't dedicate too much time to this, I guess you should have the final say anyway.

kain88-de commented 6 years ago

I disagree. The frame computation order is decided by the run method. This is easily overridable with a parallelization-aware version like the one I posted above.

This is not easy. There is a huge similarity of ParallelAnalysis to BaseAnalysis that might suggest it is but there are a few subtle differences. The biggest issue is the API of _single_frame. In BaseAnalysis we have

def _single_frame(self):
    # objects (atomgroups) to work on are referenced from self
    # also outputs have to be stored in `self`

This is problematic for writing parallel code. What we have done here is introduce state. With state we can have race conditions in updating attributes of self. Of course python starts new processes for multiprocessing so every process actually has it's own copy of self, so just reading attributes even changing attributes likes a atomgroup might work, I say might I never tested this and there is not guarantee this won't break in a future version of python. But even if the reading works without problems we still store all results in self. This is the nail in the coffin. There is now way this can be returned as a result of a parallel computation. All parallel frameworks depend on functions having return values for the calculation. So using BaseAnalysis as it is and adding a decorator will NOT WORK.

On the other hand the API of ParallelAnalysis

def _single_frame(self, ags):
    # self only contains read-only attrs
    # changing variables (type AtomGroup only) are passed directly
    return value  # Explicit return value

Here all problems go away. We explicitly parse in arguments for changing objects (ags). self stores read-only variables only. And we have an explicit return value.

These changes are small but very fundamental. If I'd done them in MDAnalysis all classes would have to be rewritten in the same PR. We would break all user code that uses BaseAnalysis. We would immediately be stuck with this API again for a long time. I hope those explanations make it clear why this isn't easy.

I don't think discoverability weighs that much more than simplicity. Anyway, this is a good argument for implementing a result combination method (_combine, above). Analyses that have that method can be parallelized. Those that don't, can't

I meant discover-able for a user. It would be kind of akward to explain your API. Much better is saying: Hey run now has a n_jobs argument to run the analysis in parallel. So a user just has to look at the docstring to find out if parallelism is available for that class. Having to check if a hidden method is defined is much more cumbersome, also since it then has to be decorated as well.

Well, the API is young and if you think there are warts we can and should fix them. Still, with what I propose we keep the current AnalysisBase API, extend it for devs that want to make their Analyses parallel, and that's it.

Like I've written about. This isn't an easy fix. Also while my current solution looks good and already very general I do not want to be tied to it right now. Everything that we publish in MDAnalysis has to be stable for some time because people are going to depend on it. I would rather develop analysis classes here in this package allowing to break things and then implement a mature API in MDAnalysis once we figured out what we want.

orbeckst commented 6 years ago

pmda should be considered as a playground (though a useful one). For example, @mkhoshle is working on an MPI-based version (MPI seems to perform better than dask in our benchmarks but dask is more flexible and is more easily used from within Jupyter notebooks so we thought we should have multiple options to try them out in practice.) @mnmelo , you could add another module using your favorite approach.

With a view towards release 1.0, I don't want to add anything to MDAnalysis.analysis until we have actually figured out in practice how we want to do parallel analysis.

The issue of state is important and tied to the fact that we decided to use classes. mdtraj or pytraj are based on functions and those lend themselves to parallelization (map(func, data)) in a more straightforward manner. However, classes (through inheritance) can prevent code duplication and provide more flexibility. Perhaps it is worthwhile to change the signature of _single_frame() to the function-based one eventually.