MDAnalysis / pmda

Parallel algorithms for MDAnalysis
https://www.mdanalysis.org/pmda/
Other
31 stars 22 forks source link

fails with ChainReader trajectory #44

Open orbeckst opened 6 years ago

orbeckst commented 6 years ago

Expected behaviour

pmda can deal with any universe/trajectory that can be built with MDAnalysis

Actual behaviour

Fails with TypeError "TypeError: can't pickle generator objects" (see Stack trace below) when the universe.trajectory is a ChainReader

Code to reproduce the behaviour

import MDAnalysis as mda
from MDAnalysis.tests.datafiles import TPR, XTC

import pmda.rms

u = mda.Universe(TPR, 3*[XTC])
print(u.trajectory)
# <ChainReader ['adk_oplsaa.xtc', 'adk_oplsaa.xtc', 'adk_oplsaa.xtc'] with 30 frames of 47681 atoms>

protein = u.select_atoms("protein and not name H*")
R = pmda.rms.RMSD(protein, protein).run(n_jobs=4)

Currently version of MDAnalysis:

(run python -c "import MDAnalysis as mda; print(mda.__version__)") (run python -c "import pmda; print(pmda.__version__)") (run python -c "import dask; print(dask.__version__)")

Stack trace

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-4-aced2a5b0b3e> in <module>()
      1 protein = u.select_atoms("protein and not name H*")
----> 2 R = pmda.rms.RMSD(protein, protein).run(n_jobs=4)

~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/pmda/parallel.py in run(self, start, stop, step, scheduler, n_jobs, n_blocks)
    257                 blocks.append(task)
    258             blocks = delayed(blocks)
--> 259             res = blocks.compute(**scheduler_kwargs)
    260             self._results = np.asarray([el[0] for el in res])
    261             self._conclude()

~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    152         dask.base.compute
    153         """
--> 154         (result,) = compute(self, traverse=False, **kwargs)
    155         return result
    156 

~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    405     keys = [x.__dask_keys__() for x in collections]
    406     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 407     results = get(dsk, keys, **kwargs)
    408     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    409 

~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/dask/multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, **kwargs)
    175                            get_id=_process_get_id, dumps=dumps, loads=loads,
    176                            pack_exception=pack_exception,
--> 177                            raise_exception=reraise, **kwargs)
    178     finally:
    179         if cleanup:

~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    506             # Seed initial tasks into the thread pool
    507             while state['ready'] and len(state['running']) < num_workers:
--> 508                 fire_task()
    509 
    510             # Main loop, wait on tasks to finish, insert new ones

~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/dask/local.py in fire_task()
    500                 # Submit
    501                 apply_async(execute_task,
--> 502                             args=(key, dumps((dsk[key], data)),
    503                                   dumps, loads, get_id, pack_exception),
    504                             callback=queue.put)

~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/dask/multiprocessing.py in _dumps(x)
     28 
     29 def _dumps(x):
---> 30     return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     31 
     32 

~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
    893     try:
    894         cp = CloudPickler(file, protocol=protocol)
--> 895         cp.dump(obj)
    896         return file.getvalue()
    897     finally:

~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
    266         self.inject_addons()
    267         try:
--> 268             return Pickler.dump(self, obj)
    269         except RuntimeError as e:
    270             if 'recursion' in e.args[0]:

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in dump(self, obj)
    407         if self.proto >= 4:
    408             self.framer.start_framing()
--> 409         self.save(obj)
    410         self.write(STOP)
    411         self.framer.end_framing()

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save_tuple(self, obj)
    734         if n <= 3 and self.proto >= 2:
    735             for element in obj:
--> 736                 save(element)
    737             # Subtle.  Same as in the big comment below.
    738             if id(obj) in memo:

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save_tuple(self, obj)
    749         write(MARK)
    750         for element in obj:
--> 751             save(element)
    752 
    753         if id(obj) in memo:

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

~/Library/miniconda2/envs/mda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_instancemethod(self, obj)
    665         else:
    666             if PY3:
--> 667                 self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
    668             else:
    669                 self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, obj.__self__.__class__),

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    608         else:
    609             save(func)
--> 610             save(args)
    611             write(REDUCE)
    612 

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save_tuple(self, obj)
    734         if n <= 3 and self.proto >= 2:
    735             for element in obj:
--> 736                 save(element)
    737             # Subtle.  Same as in the big comment below.
    738             if id(obj) in memo:

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    519 
    520         # Save the reduce() output and finally memoize the object
--> 521         self.save_reduce(obj=obj, *rv)
    522 
    523     def persistent_id(self, obj):

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    632 
    633         if state is not None:
--> 634             save(state)
    635             write(BUILD)
    636 

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save_dict(self, obj)
    819 
    820         self.memoize(obj)
--> 821         self._batch_setitems(obj.items())
    822 
    823     dispatch[dict] = save_dict

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in _batch_setitems(self, items)
    845                 for k, v in tmp:
    846                     save(k)
--> 847                     save(v)
    848                 write(SETITEMS)
    849             elif n:

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    519 
    520         # Save the reduce() output and finally memoize the object
--> 521         self.save_reduce(obj=obj, *rv)
    522 
    523     def persistent_id(self, obj):

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    632 
    633         if state is not None:
--> 634             save(state)
    635             write(BUILD)
    636 

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save_dict(self, obj)
    819 
    820         self.memoize(obj)
--> 821         self._batch_setitems(obj.items())
    822 
    823     dispatch[dict] = save_dict

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in _batch_setitems(self, items)
    845                 for k, v in tmp:
    846                     save(k)
--> 847                     save(v)
    848                 write(SETITEMS)
    849             elif n:

~/Library/miniconda2/envs/mda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    494             reduce = getattr(obj, "__reduce_ex__", None)
    495             if reduce is not None:
--> 496                 rv = reduce(self.proto)
    497             else:
    498                 reduce = getattr(obj, "__reduce__", None)

TypeError: can't pickle generator objects
orbeckst commented 6 years ago

The problem is that ParallelAnalysisBase._dask_helper() needs to be pickled by dask. But it is a bound method of ParallelAnalysisBase so that method (or rather, the class that was derived from it, e.g. RMSD) also needs to be pickled. If this class contains anything that cannot be pickled, the whole pickle fails. In particular, there is ParallelAnalysisBase._trajectory, which contains the Reader from the universe (not the universe itself, which is currently no picklable). Almost all readers are picklable, except the ChainReader, because it contains a Generator:

_ChainReader__chained_trajectories_iter': <generator object ChainReader._chained_iterator at 0x7f1c19716468>

Ideally, pickling _dask_helper() would not need to pickle the enclosing class. However, a short term fix should be to make the ChainReader picklable.

orbeckst commented 4 years ago

ChainReader should work once PR https://github.com/MDAnalysis/mdanalysis/pull/2723 is merged.