MDAnalysis / mdanalysis

MDAnalysis is a Python library to analyze molecular dynamics simulations.
https://mdanalysis.org
Other
1.29k stars 646 forks source link

Parallelisation failed- get_distance_matrix() #2060

Closed GianFree closed 4 years ago

GianFree commented 6 years ago

Expected behavior Parallel computation (with 2 or more cores) of a rmsd matrix with MDAnalysis.analysis.encore.confdistmatrix.get_distance_matrix().

Actual behavior Running the following command with my PSF and DCD, I get this error.

import MDAnalysis as mda
import MDAnalysis.analysis.encore as encore

u1 = mda.Universe('my.psf', 'my.dcd', in_memory=True, in_memory_step=10)

rmsdMatrix = encore.confdistmatrix.get_distance_matrix(
        u1, selection='name CA', superimpose=True, n_jobs=-1, weights='mass', metadata=True, verbose=True)

Output:

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 4 concurrent workers.
---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/jarvis/anaconda2/envs/py3/lib/python3.6/site-packages/joblib/externals/loky/process_executor.py", line 425, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/home/jarvis/anaconda2/envs/py3/lib/python3.6/site-packages/joblib/_parallel_backends.py", line 561, in __call__
    return self.func(*args, **kwargs)
  File "/home/jarvis/anaconda2/envs/py3/lib/python3.6/site-packages/joblib/parallel.py", line 261, in __call__
    for func, args, kwargs in self.items]
  File "/home/jarvis/anaconda2/envs/py3/lib/python3.6/site-packages/joblib/parallel.py", line 261, in <listcomp>
    for func, args, kwargs in self.items]
  File "/home/jarvis/anaconda2/envs/py3/lib/python3.6/site-packages/MDAnalysis/analysis/encore/confdistmatrix.py", line 244, in set_rmsd_matrix_elements
    coords[j].shape[0], weights, sumweights)
ValueError: assignment destination is read-only
"""

The above exception was the direct cause of the following exception:

ValueError                                Traceback (most recent call last)
<ipython-input-31-28b74f5d43a7> in <module>()
      1 rmsdMatrix = encore.confdistmatrix.get_distance_matrix(
----> 2         u1, selection='name CA', superimpose=True, n_jobs=-1, weights='mass', metadata=True, verbose=True)

~/anaconda2/envs/py3/lib/python3.6/site-packages/MDAnalysis/analysis/encore/confdistmatrix.py in get_distance_matrix(ensemble, selection, load_matrix, save_matrix, superimpose, superimposition_subset, weights, n_jobs, verbose, *conf_dist_args, **conf_dist_kwargs)
    361                                                         weights=weights,
    362                                                         n_jobs=n_jobs,
--> 363                                                         verbose=verbose)
    364 
    365         logging.info("    Done!")

~/anaconda2/envs/py3/lib/python3.6/site-packages/MDAnalysis/analysis/encore/confdistmatrix.py in conformational_distance_matrix(ensemble, conf_dist_function, selection, superimposition_selection, n_jobs, pairwise_align, weights, metadata, verbose)
    175         weights,
    176         fitting_coordinates,
--> 177         subset_weights) for element in indices)
    178 
    179 

~/anaconda2/envs/py3/lib/python3.6/site-packages/joblib/parallel.py in __call__(self, iterable)
    992 
    993             with self._backend.retrieval_context():
--> 994                 self.retrieve()
    995             # Make sure that we get a last message telling us we are done
    996             elapsed_time = time.time() - self._start_time

~/anaconda2/envs/py3/lib/python3.6/site-packages/joblib/parallel.py in retrieve(self)
    895             try:
    896                 if getattr(self._backend, 'supports_timeout', False):
--> 897                     self._output.extend(job.get(timeout=self.timeout))
    898                 else:
    899                     self._output.extend(job.get())

~/anaconda2/envs/py3/lib/python3.6/site-packages/joblib/_parallel_backends.py in wrap_future_result(future, timeout)
    513         AsyncResults.get from multiprocessing."""
    514         try:
--> 515             return future.result(timeout=timeout)
    516         except LokyTimeoutError:
    517             raise TimeoutError()

~/anaconda2/envs/py3/lib/python3.6/site-packages/joblib/externals/loky/_base.py in result(self, timeout)
    429                 raise CancelledError()
    430             elif self._state == FINISHED:
--> 431                 return self.__get_result()
    432             else:
    433                 raise TimeoutError()

~/anaconda2/envs/py3/lib/python3.6/site-packages/joblib/externals/loky/_base.py in __get_result(self)
    380     def __get_result(self):
    381         if self._exception:
--> 382             raise self._exception
    383         else:
    384             return self._result

ValueError: assignment destination is read-only

Code to reproduce the behavior

I cannot reproduce the same error if I use the test file or a different number of cores. And nothing changed in my enviroment between the two computation.

I also checked my data, but they are correctly read by VMD and does not appear to be corrupted. Using only one core, everything works fine.

import MDAnalysis as mda
import MDAnalysis.analysis.encore as encore
from MDAnalysisTests.datafiles import PSF, DCD

u = mda.Universe(PSF, DCD, dt=1, in_memory=True, in_memory_step=2 )

rmsdMatrix = encore.confdistmatrix.get_distance_matrix(
u, selection='name CA', superimpose=True, n_jobs=2, weights='mass', metadata=True, verbose=True)

Output:

[Parallel(n_jobs=2)]: Using backend LokyBackend with 2 concurrent workers.
[Parallel(n_jobs=2)]: Done 1225 out of 1225 | elapsed:    1.1s finished

Currently version of MDAnalysis

orbeckst commented 6 years ago

cc @mtiberti @wouterboomsma @tbengtsen

mtiberti commented 6 years ago

Thanks @GianFree for the report. I've been investigating this, it seems to be related to using joblib to parallelise the calculation.

The crash happens when joblib automatically converts one of our numpy.array argument, which is where we store the calculated values, to numpy.memmap (quoting from their docs: "Parallel provides a special handling for large arrays to automatically dump them on the filesystem and pass a reference to the worker to open them as memory map on that file using the numpy.memmap subclass of numpy.ndarray. This makes it possible to share a segment of data between all the worker processes.") and then can't write to it. This behaviour is triggered when the size of the input array exceeds the default max_nbytes which is why we don't see it in all cases.

The fix could easily be adding the max_nbytes=None (which stops this behaviour) or making the memmaps writeable (mmap_mode='w+'). This seems to work, however at the end of the calculation we get the initialised array instead of the results (!) which means the array is not properly shared. The suggested way to handle this scenario is to create a numpy.memmap explicitly instead of a numpy.array and give it as an argument for the parallel job - this seems to work flawlessly, however I'm a bit unsure how to do this practically, since numpy.memmap writes an actual file on the filesystem. As a reference, joblib transparently does this:

However I would prefer avoiding to create a file on file system to begin with, the best would of course keep everything in memory. Any idea on how we could proceed with this?

kain88-de commented 6 years ago

Using a shared writable memmap will very likely cause random crashes or randomly wrong results. If two processes happen to write to the same memmap at the same time and location data corruption is a very likely possibility. I would be careful with a shared writable array. Having shared writes is an issue anyway, it works best if you can guarantee that no to processes will write to the same memory location, easiest done if every iteration in the loop-body updates a different entry.