bccp / nbodykit

Analysis kit for large-scale structure datasets, the massively parallel way
http://nbodykit.rtfd.io
GNU General Public License v3.0
111 stars 60 forks source link

mesh size changes when running with mpi #627

Open xiaohanzai opened 4 years ago

xiaohanzai commented 4 years ago

I'm trying to calculate the power spectrum for an N-body sim. There are too many particles so I have to load in a subset of all the particles at a time in order not to run out of memory. So what I'm doing is like below:

f_sum = np.zeros((256,256,256)) for i in range(26): # 26 is the number of subsets of particles data = ... # load in the i-th subset of the particles cat = ArrayCatalog({'Position' : data['Position']) mesh = cat.to_mesh(resampler='tsc', Nmesh=256, compensated=True, interlaced=True) f = mesh.to_real_field()*len(data['Position']) f_sum += f f_sum /= total # of particles mesh_sum = ArrayMesh(f_sum) r = FFTPower(mesh_sum, mode='1d')

Since the above is too slow (each subset has ~1e9 particles), I'm trying to parallelize the code with mpi4py, and the code is like this:

comm = CurrentMPIComm.get() rank = comm.rank size = comm.size for i in range(rank, 26, size): same as above... if rank == 0: comm.Reduce(...)

The problem is that when I run the code with more than 1 processors, the shape of the f array also changes, so it's not (256,256,256) anymore. mpirun -n 1 is still fine, but mpirun -n 2 gives (128, 256, 256) and (128, 256, 256), and -n 3 gives (256, 86, 256), (256, 86, 256), (256, 84, 256). I'm not sure what's happening here. What I hope the code would do is that each processor can do the computation for one subset of particles, and in the end I can sum up the results from all processors, but it doesn't seem to work like this? Maybe I have understandings about mpi4py as well...

rainwoodman commented 4 years ago

Indeed. The data model of nbodykit is different from your expectation. The data in nbodykit is always distributed to all ranks in the MPI communicator, each of the MPI ranks more-or-less runs identical computing on its own partition of the data.

Therefore, the data you pass into ArrayCatalog must already be distributed by the MPI ranks.

As your data comes with 26 subsets, I would start with 26 MPI ranks, each creates a local ArrayCatalog from its piece of data. Then you can use to_mesh() to obtain a distributed mesh, which can then be handled by FFTPower, something like this:

comm = MPI.COMM_WORLD
assert comm.size == 26  # otherwise we miss data
data = loaddata(file_id = comm.rank)
cat = ArrayCatalog(data ..., comm=comm)
mesh = cat.to_mesh(....)
r = FFTPower(mesh, mode='1d', comm=comm)
r.save(....)

26 is not a very good number of ranks for FFT, which prefers powers of 2; but it should still work.

There must be a way to deal with the mismatch between the number of subsets and the number of ranks. We have some similar mechanism for files formats known to nbodykit.io as FileStack, but this doesn't work with ArrayCatalog.

If you think of a way to do this on ArrayCatalog, let me know.

xiaohanzai commented 4 years ago

Thanks! But I guess it still doesn't work for me... I think another example very similar to my problem would be the code below:

comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size

bias = rank + 1.0

# set up the linear power spectrum
redshift = 0.55
cosmo = cosmology.Planck15
Plin = cosmology.LinearPower(cosmo, redshift, transfer='EisensteinHu')

# initialize the catalog for this bias
cat = LogNormalCatalog(Plin=Plin, nbar=3e-3, BoxSize=1380., Nmesh=256, bias=bias, comm=comm)

# convert to a MeshSource, using TSC interpolation on 256^3 mesh
mesh = cat.to_mesh(resampler='tsc', Nmesh=256, compensated=True)

r = FFTPower(mesh, mode='1d')
Pk = r.power
plt.loglog(Pk['k'], Pk['power'].real - Pk.attrs['shotnoise'], label='bias=%.1f' % bias)
plt.legend()
plt.show()

And then when running with, e.g. mpirun -n 4, all the plots would look the same, although I would expect each power spectrum should correspond to bias = rank+1.

Returning to my issue, I think I have to add all the 26 meshes together before doing FFT, because the 26 files together makes the full snapshot. So I guess what I should do is use f = mesh.to_real_field() and add up all the f arrays together (unless the meshes can be added directly?). But it seems the f arrays in different ranks are not independent of each other, and also change shape when changing the number of processors?

xiaohanzai commented 4 years ago

Oh I guess I'm slowly beginning to understand... When we do

data = loaddata(file_id = comm.rank)
cat = ArrayCatalog(data ..., comm=comm)
mesh = cat.to_mesh(....)
r = FFTPower(mesh)

are the cat of different ranks already summed up automatically, so I don't have to add up the meshes by myself to do the FFT? So all I have to do is load in the files with each processor? And when saving the mesh or the power spectrum, I can do it with if rank == 0?

rainwoodman commented 4 years ago

On Fri, Jul 31, 2020 at 8:40 AM Xiaohan Wu notifications@github.com wrote:

Oh I guess I'm slowly beginning to understand... When we do

data = loaddata(file_id = comm.rank) cat = ArrayCatalog(data ..., comm=comm) mesh = cat.to_mesh(....) r = FFTPower(mesh)

are the cat of different ranks already summed up automatically, so I don't have to add up the meshes by myself to do the FFT?

Yes. that's right. The term we usually use is that the global catalog is distributed / partitioned to different ranks. Sometimes we also say that the catalog is a 'collective' object defined on that MPI communicator. The word 'sum' can mean different things (e.g. a reduction with MPI.SUM)

So all I have to do is load in the files with each processor?

Yes. Sounds about right.

And when saving the mesh or the power spectrum, I can do it with if rank == 0?

mesh.save() and r.save() are collective methods that run on all MPI ranks; no need (and shouldn't) filter it to run on rank == 0.

You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/bccp/nbodykit/issues/627#issuecomment-667187261, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABBWTBWOWS2XP65WXDKY7TR6LQU7ANCNFSM4POUFBDQ .

xiaohanzai commented 4 years ago

Actually I realized that I'm only allowed to use a maximum of 16 cpus, or even fewer depending on other users... So if I only process a few files at a time and save several meshes, is there a way to combine them and do the FFT after processing everything?

Also, I probably missed this in the doc but after saving a MeshSource object, how can it be loaded back in?

rainwoodman commented 4 years ago

what about assigning more than one files to a rank?

On Fri, Jul 31, 2020, 5:23 PM Xiaohan Wu notifications@github.com wrote:

Actually I realized that I'm only allowed to use a maximum of 16 cpus, or even fewer depending on other users... So if I only process a few files at a time and save several meshes, is there a way to combine them and do the FFT after processing everything?

Also, I probably missed this in the doc but after saving a MeshSource object, how can it be loaded back in?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/bccp/nbodykit/issues/627#issuecomment-667436488, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABBWTCXCKZFD26LKQRPIFTR6NOBTANCNFSM4POUFBDQ .

xiaohanzai commented 4 years ago

That's a good point. I guess it's at least doable for a smaller subsample. I'll give it a try. Thank you so much for your help!

rainwoodman commented 4 years ago

Sure. We should add something like a ShardedCatalog(). Let me poke with this a bit and we can move the discussion to a PR.

On Sun, Aug 2, 2020 at 8:40 AM Xiaohan Wu notifications@github.com wrote:

That's a good point. I guess it's at least doable for a smaller subsample. I'll give it a try. Thank you so much for your help!

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/bccp/nbodykit/issues/627#issuecomment-667689527, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABBWTAHI3IM57JGQE25FJLR6WCFTANCNFSM4POUFBDQ .