MDAnalysis / mdanalysis

MDAnalysis is a Python library to analyze molecular dynamics simulations.
https://mdanalysis.org
Other
1.33k stars 653 forks source link

Parallel Writing Support for H5MDWriter #3436

Open yuxuanzhuang opened 3 years ago

yuxuanzhuang commented 3 years ago

Is your feature request related to a problem?

Related to #3231, I have been struggling to find the best way to save the processed (i.e. unwrapped, centered, aligned by on-the-fly transformations) trajectories. The bottleneck of it should be the long processing time for each timestep (with or without transformation I think?) as opposed to the overall I/O limit even when I am processing and saving all the trajectories in parallel.

Describe the solution you'd like

Utilizing the writing in parallel functionality of h5py(https://docs.h5py.org/en/stable/mpi.html) to save each trajectory. I have started to test it a little bit on this matter today with the following snippet

# write.py
from mpi4py import MPI
import h5py

num_processes = MPI.COMM_WORLD.size
rank = MPI.COMM_WORLD.rank  

n_slices = num_processes

import MDAnalysis as mda
from pmda.util import make_balanced_slices

# 92338 frames
# 6951 atoms
u = mda.Universe('start.pdb', 'md.xtc')

slices = make_balanced_slices(u.trajectory.n_frames, n_slices)

f = h5py.File('test.hdf5', 'w', driver='mpio', comm=MPI.COMM_WORLD)

dset = f.create_dataset('positions', (u.trajectory.n_frames, u.atoms.n_atoms, 3), dtype='f')

for i, traj_slice in enumerate(slices):
    if i % num_processes == rank:
        for ts in u.trajectory[traj_slice]:
            data = u.atoms.positions
            dset[ts.frame] = data

f.close()

With a 64 thread CPU, mpiexec --oversubscribe -n 64 python write.py 1 mins 5s

mpiexec --oversubscribe -n 32 python write.py 1 mins 30s

mpiexec --oversubscribe -n 16 python write.py 2 mins 30s

The disk I/O is capped at ~5 MB/s for every single thread no matter how many threads (1-32) are used.

I guess the tricky part is how to implement it into H5MDWriter which currently doesn't support mpio driver. (@edisj )

Describe alternatives you've considered

Maybe use some sort of machinery to buffer process/load the next few frames into memory using extra cores---which should be generic for all formats.

IAlibay commented 3 years ago

I'm going to ping @orbeckst and @edisj here whom I think were thinking about this too?

edit: didn't see you had tagged them already :P

orbeckst commented 3 years ago

@edisj was working on parallel writing. It wasn't entirely clear to us how this should work in the context of MDAnalysis but perhaps @edisj could quickly summarize where his efforts were and what the current roadblock was?

Somewhat related: parallel reading #2865 (I think that's stuck on how to serialize the MPI communicator)

orbeckst commented 3 years ago

@yuxuanzhuang what file system are you doing the writing on?

I don't think that parallel writing will do anything spectacular unless you're on a parallel file system such as Lustre or BeeGFS (see. for instance, https://www.alcf.anl.gov/files/Parallel_HDF5_1.pdf).

image

yuxuanzhuang commented 3 years ago

@yuxuanzhuang what file system are you doing the writing on?

CephFS. I am not confident enough to say if it supports HDF5 or not, but I don't think it is the limitation here given that the major bottleneck is the loading/processing of each timestep.

orbeckst commented 3 years ago

For the filesystem the question is if the MPI implementation has its I/O code (ROM-IO?) linked to the parallel fs ... I don't remember the details — maybe @edisj knows more.

edisj commented 3 years ago

I'm not sure about the specifics as to how the MPI implementation you're using interfaces any specific filesystem (regarding settings needed to be configured when building, any specific tuning needed to build on Lustre vs BeeGFS, etc), so I couldn't say much on whether your implementation is configured correctly to interface with your filesystem. I always just assumed the MPI module on whichever HPC I was using was configured optimally.

But, like @orbeckst said, MPI I/O really starts to shine when coupled with parallel file system properties like striping, where a single file can have many physical access points, so to speak. In the multiple processes writing to a single file case, even if you have 64 processes writing to a single file in parallel, there may still be some file contention happening.

@yuxuanzhuang one thing I think you should try is writing to your dataset collectively. I never had a chance to mess around with collective vs independent MPI I/O, but it's something I'm still interested in testing for H5MD.

I'll quickly explain what I mean-

In your example script above, each process is iterating through their u.trajectory[traj_slice] and independently attempting to write to the correct location on disk. During this procedure, every process is effectively competing for disk access and depending on the number of concurrent processes making requests to write, I/O time can take a big hit.

However, there is a way to hand off the entirety of the I/O task to MPI and let it figure how (presumably an optimal way haha) it wants to distribute the writing among available processes.

(As an aside, an important detail about MPI I/O in general is: any time the meta data of the file is changed, this must be done collectively, so any time you change the size of a dataset, mess with attributes, create a group, etc, that introduces an implicit comm.Barrier() that would halt all processes until that line of code has executed for all processes.)

So for your example, it turns out you can do the parallel writing collectively rather than independently by using the dataset.collective context manager. Something like:

dset = f.create_dataset('positions', (u.trajectory.n_frames, u.atoms.n_atoms, 3), dtype='f')

for ts in u.trajectory:
    with dset.collective:
        data = u.atoms.positions
        dset[ts.frame] = data

With this, all processes will collectively write the frame, and which piece of data each process writes is all handled by MPI. I don't know if the above frame-by-frame iteration writing will even be an improvement over independent IO writing, BUT, if you were to say, load the entire trajectory into memory and then offload the entire writing procedure to be handled collectively, something like:

# You probably want to parallelize this too with start and stop checkpoints from make_balanced_slices()
u.transfer_to_memory()

dset = f.create_dataset('positions', (u.trajectory.n_frames, u.atoms.n_atoms, 3), dtype='f')

with dset.collective:
    data = u.atoms.positions
    dset[:] = data

In this example, you slurp the entire trajectory into memory (if you have enough memory...) and then hand it off to MPI to figure out the optimal way to use all processes to write to disk. I never tested this with H5MD but am very curious as to what the results would be

edit: Also, with this approach, I imagine there are fancy ways you can use some processes to continuously read the trajectory into a buffer in memory while other processes collectively write, which would be pretty neat

edisj commented 3 years ago

@edisj was working on parallel writing. It wasn't entirely clear to us how this should work in the context of MDAnalysis but perhaps @edisj could quickly summarize where his efforts were and what the current roadblock was?

Like @orbeckst said, the details on how to make parallel writing work without a bunch of user-input wasn't clear.

At the moment, H5MDWriter works by extending the length of each dataset being written to and writing to the [-1] index, which works pretty well in serial. The reason for this was that, in general, a writer shouldn't need to know a priori how many frames it's going to write, so the hdf5 dataset being written to had to be extendable.

For writing in parallel, let's use a simple example like

with mda.Writer('filename.h5md', n_atoms=n_atoms) as W:
    for ts u.trajectory[start:stop]: # each process has different start and stop
        W.write(u)

Already, the dataset "extend then write to the new spot" method wouldn't work here because different processes are starting in very different locations in the ts.coordinates array. I don't see a way around this, so the dataset will likely have to be initialized with it's full size ahead of time where the user will have to add an n_frames argument or something.

Then, supposing you do initialize the dataset with its full size, how does the writer know where to write its data? At first, I thought this would work by just writing to dataset[ts.frame], but that only works in cases where you're copying the whole array to the new file. In cases where, for example, you are writing every other frame from a trajectory, then ts.frame would overshoot the length of the dataset.

But you could do whatever you want. Like, say you have 4 processes, and you arbitrarily choose to parallelize the writing by

You could initialize the dataset with the correct dimensions of (11, n_atoms, 3), but when it comes time for H5MDWriter._write_next_timestep() to decide at which index to place the frame in that array, I don't think there is a clear way to guarantee each process writes to the correct spot without additional information from the user, because all of that trajectory slicing is happening outside of H5MDWriter in a script. Does anyone have an ideas on this?

@orbeckst and I talked about using an additional argument for parallel writing that provided some sort of "map" that mapped the input frames to the indices of the output dataset, but the primary goal was to get H5MDWriter up and running for serial at least and then worry about these details.

edisj commented 3 years ago

@yuxuanzhuang are you already able to do the processing in parallel? If I get a prototype parallel H5MDWriter up and running would it be useful?

yuxuanzhuang commented 3 years ago

@yuxuanzhuang are you already able to do the processing in parallel? If I get a prototype parallel H5MDWriter up and running would it be useful?

Thanks for all your comments! Need a bit more time to digest and test but if you have a prototype writer at hand, it would be really useful.

orbeckst commented 3 years ago

Another point that @edisj mentioned to me was to have the parallel H5MDWriter to have a separate MPI code path where the writer's rank 0 communicates with the other ranks to figure out the exact slices. Then rank 0 can effectively build the map for frames from the global information. This could make parallel writing work seamlessly.