bccp / nbodykit

Analysis kit for large-scale structure datasets, the massively parallel way
http://nbodykit.rtfd.io
GNU General Public License v3.0
111 stars 60 forks source link

Help with parallelising nbodykit code #672

Open andrejobuljen opened 2 years ago

andrejobuljen commented 2 years ago

Hi,

I’m trying to make this code run in parallel. It's built upon nbodykit and in essence does the following:

Ideally I would like to obtain the same output d1 field when I run it on a single core and when I run it on multiple cores. Below is a the core function which computes d1. I then save it doing FieldMesh(d1).save(…) and I get different output fields when I run with: python code.py or with mpirun -np 4 python code.py. The output fields look similar and I'm guessing what happens is that I'm only saving the field from a single core...

def generate_d1(delta_ic, cosmo, nbar, zic, zout, plot=True, weight=True, Rsmooth=0, seed=1234, Rdelta=0, comm=None):
    scale_factor = 1/(1+zout)
    Nmesh = delta_ic.Nmesh
    BoxSize = delta_ic.BoxSize[0]
    prefactor = cosmo.scale_independent_growth_factor(zout)/cosmo.scale_independent_growth_factor(zic)

    disp_f = [get_displacement_from_density_rfield(delta_ic, component=i, Psi_type='Zeldovich', smoothing={'R': Rsmooth,}) for i in range(3)]
    Nptcles_per_dim = int((nbar*BoxSize**3)**(1/3))

    pos = UniformCatalog(nbar, BoxSize=BoxSize, seed=seed)['Position']
    N = pos.shape[0]

    dtype = np.dtype([('Position', ('f8', 3)), ('delta_1', 'f8')])
    catalog = np.empty(pos.shape[0], dtype=dtype)
    catalog['Position'][:] = pos[:]
    del pos

    catalog['delta_1'][:] = delta_ic.readout(catalog['Position'], resampler='cic')*prefactor
    catalog['delta_1'][:] -= np.mean(catalog['delta_1'])

    displacement = np.zeros((N, 3))
    for i in range(3):
        displacement[:, i] = disp_f[i].readout(catalog['Position'], resampler='cic')
    displacement *= prefactor
    catalog['Position'][:] = (catalog['Position'] + displacement) % BoxSize
    del displacement, disp_f

    catalog = ArrayCatalog(catalog, BoxSize=BoxSize * np.ones(3), Nmesh=Nmesh, comm=comm)
    d1 = catalog.to_mesh(value='delta_1', compensated=True).to_real_field()     
    return d1

I tried going over the instructions to parallelise, but unfortunately nothing worked so far, so I was just wondering if there is a quick hack to make it work, that would be great. Please let me know if more info is needed.

Thanks and bests, Andrej

rainwoodman commented 2 years ago

What you described looked alright to me from the high level.

I am somewhat worried about the line calling readout. I recall usually we shall deal with decomposition and particle exchange. But it could be that the readout function handles this internally. Could you take a look which readout function you are calling?

Could you confirm quickly if the difference not within round off error?

On Mon, Oct 10, 2022 at 3:32 AM Andrej Obuljen @.***> wrote:

Hi,

I’m trying to make this code https://github.com/andrejobuljen/Hi-Fi_mocks run in parallel. It's built upon nbodykit and in essence does the following:

  • Generates initial linear density field (d_ic) on the mesh,
  • Computes displacement field on the mesh,
  • Generates uniform catalog of particles and computes d_ic at each particles position,
  • Shifts each particle by the displacement field at its position,
  • Assigns shifted particles to the mesh by weighting each particle by d_ic to obtain the output d1 field.

Ideally I would like to obtain the same output d1 field when I run it on a single core and when I run it on multiple cores. Below is a the core function which computes d1. I then save it doing FieldMesh(d1).save(…) and I get different output fields when I run with: python code.py or with mpirun -np 4 python code.py. The output fields look similar and I'm guessing what happens is that I'm only saving the field from a single core...

def generate_d1(delta_ic, cosmo, nbar, zic, zout, plot=True, weight=True, Rsmooth=0, seed=1234, Rdelta=0, comm=None):

scale_factor = 1/(1+zout)

Nmesh = delta_ic.Nmesh

BoxSize = delta_ic.BoxSize[0]

prefactor = cosmo.scale_independent_growth_factor(zout)/cosmo.scale_independent_growth_factor(zic)

disp_f = [get_displacement_from_density_rfield(delta_ic, component=i, Psi_type='Zeldovich', smoothing={'R': Rsmooth,}) for i in range(3)]

Nptcles_per_dim = int((nbar*BoxSize**3)**(1/3))

pos = UniformCatalog(nbar, BoxSize=BoxSize, seed=seed)['Position']

N = pos.shape[0]

dtype = np.dtype([('Position', ('f8', 3)), ('delta_1', 'f8')])

catalog = np.empty(pos.shape[0], dtype=dtype)

catalog['Position'][:] = pos[:]

del pos

catalog['delta_1'][:] = delta_ic.readout(catalog['Position'], resampler='cic')*prefactor

catalog['delta_1'][:] -= np.mean(catalog['delta_1'])

displacement = np.zeros((N, 3))

for i in range(3):

    displacement[:, i] = disp_f[i].readout(catalog['Position'], resampler='cic')

displacement *= prefactor

catalog['Position'][:] = (catalog['Position'] + displacement) % BoxSize

del displacement, disp_f

catalog = ArrayCatalog(catalog, BoxSize=BoxSize * np.ones(3), Nmesh=Nmesh, comm=comm)

d1 = catalog.to_mesh(value='delta_1', compensated=True).to_real_field()

return d1

I tried going over the instructions to parallelise, but unfortunately nothing worked so far, so I was just wondering if there is a quick hack to make it work, that would be great. Please let me know if more info is needed.

Thanks and bests, Andrej

— Reply to this email directly, view it on GitHub https://github.com/bccp/nbodykit/issues/672, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABBWTFBAW6773KJOICLJNLWCPWDHANCNFSM6AAAAAARBGZRWE . You are receiving this because you are subscribed to this thread.Message ID: @.***>

andrejobuljen commented 2 years ago

Hi,

Thanks for your reply. Yes, the difference is bigger than the round-off error.

I made some progress in the meanwhile and rewrote my function following this. I paste the whole code I used for testing below. I'm now able to get the same results when running on a single core (serial) vs multiple, but only in the case where I simply assign particles to the mesh after moving them by Zeldovich. However, when I try to do value=delta_1 in to_mesh() function, I again get different results. You can see this in the plot I attach showing these 4 cases: serial vs mpirun, and switching on/off the value keyword. Let me know if this makes sense and if there's a way to get the same results.

Bests, Andrej

d1_comparison

import numpy as np
from nbodykit.lab import *
# from nbodykit import CurrentMPIComm
from pmesh.pm import ParticleMesh

def get_dlin(seed, Nmesh, BoxSize, Pk):
    pm = ParticleMesh([Nmesh,Nmesh,Nmesh], BoxSize, comm=comm)
    wn = pm.generate_whitenoise(seed)
    dlin = wn.apply(lambda k, v: Pk(sum(ki ** 2 for ki in k)**0.5) ** 0.5 * v / v.BoxSize.prod() ** 0.5)
    return dlin

def generate_d1(delta_ic, cosmo, nbar, zic, zout, plot=True, weight=True, Rsmooth=0, seed=1234, Rdelta=0, comm=None):
    scale_factor = 1/(1+zout)
    Nmesh = delta_ic.Nmesh
    BoxSize = delta_ic.BoxSize[0]
    prefactor = cosmo.scale_independent_growth_factor(zout)/cosmo.scale_independent_growth_factor(zic)

    pos = UniformCatalog(nbar, BoxSize=BoxSize, seed=seed, comm=comm)['Position'].compute()
    N = pos.shape[0]
    catalog = np.empty(pos.shape[0], dtype=[('Position', ('f8', 3)), ('delta_1', 'f8')])
    displ_catalog = np.empty(pos.shape[0], dtype=[('displ', ('f8',3))])

    catalog['Position'][:] = pos[:]

    layout = delta_ic.pm.decompose(pos)

    catalog['delta_1'] = delta_ic.c2r().readout(pos)

    def potential_transfer_function(k, v):
        k2 = k.normp(zeromode=1)
        return v / (k2)
    pot_k = delta_ic.apply(potential_transfer_function, out=Ellipsis)

    for d in range(3):
        def force_transfer_function(k, v, d=d):
            return k[d] * 1j * v
        force_d = pot_k.apply(force_transfer_function).c2r(out=Ellipsis)
        displ_catalog['displ'][:, d] = force_d.readout(pos, layout=layout, resampler='cic')*prefactor

    catalog['Position'] = (catalog['Position'] + displ_catalog['displ']) % BoxSize
    del pos, displ_catalog

    d1 = ArrayCatalog(catalog, BoxSize=BoxSize * np.ones(3), Nmesh=Nmesh, comm=comm).to_mesh(compensated=True)
    # d1 = ArrayCatalog(catalog, BoxSize=BoxSize * np.ones(3), Nmesh=Nmesh, comm=comm).to_mesh(value='delta_1', compensated=True)
    return d1

comm = CurrentMPIComm.get()    
print ('comm', comm, 'comm.rank', comm.rank)
rank = comm.rank

##########################
### General parameters ###
##########################

seed = 5
nbar = 0.01
Nmesh = 64
BoxSize = 100
zout = 1
zic = 127 # TNG initial redshift

print ("Generating HI mock in real-space at output redshift z=%.0f, in a BoxSize L=%.1f using nbar=%.2f (%i particles) on a Nmesh=%i^3 grid with IC seed %i..."%(zout, BoxSize, nbar, int(nbar*BoxSize**3), Nmesh, seed))

# Cosmology
c = cosmology.Planck15
c = c.match(sigma8=0.8159)
Plin_z0 = cosmology.LinearPower(c, 0)
Dic  = c.scale_independent_growth_factor(zic)

# Generate linear overdensity field at zic
print ('Generating initial density field... ')
dlin = get_dlin(seed, Nmesh, BoxSize, Plin_z0)
dlin *= Dic

# Compute shifted fields
print ('Computing shifted fields... ')
d1 = generate_d1(dlin, c, nbar, zic, zout, comm=comm)
d1.save('new_d1_seed_%i_mpi_novalue'%seed, mode='real')
rainwoodman commented 2 years ago

On Tue, Oct 11, 2022 at 11:44 AM Andrej Obuljen @.***> wrote:

Hi,

Thanks for your reply. Yes, the difference is bigger than the round-off error.

I made some progress in the meanwhile and rewrote my function following this http://rainwoodman.github.io/pmesh/intro.html. I paste the whole code I used for testing below. I'm now able to get the same results when running on a single core (serial) vs multiple, but only in the case where I simply assign particles to the mesh after moving them by Zeldovich. However, when I try to do value=delta_1 in to_mesh() function, I again get different results. You can see this in the plot I attach showing these 4 cases: serial vs mpirun, and switching on/off the value keyword. Let me know if this makes sense and if there's a way to get the same results.

Well this is certainly good progress. Here is my guess for the problem with delta_1 -- I think the readout() call for delta_1 also needs the layout -- the layout returned by decompose is a table that routes particles near the boundary of a process to the neighbouring processes. If you look at its doc string, the default size of this smoothing region is half of the window function's support (for CIC, the support is 2 cell size): https://rainwoodman.github.io/pmesh/pmesh.pm.html#pmesh.pm.ParticleMesh.decompose In general, readout in parallel will need to make use of the information in layout to make sure the effect of particles near the process's boundary are not missing from the other processes.

Bests,

Andrej

[image: d1_comparison] https://user-images.githubusercontent.com/28044397/195172979-adfd937a-48d3-4738-8d98-e655fb7bd77f.png

import numpy as np from nbodykit.lab import *

from nbodykit import CurrentMPIComm

from pmesh.pm import ParticleMesh

def get_dlin(seed, Nmesh, BoxSize, Pk): pm = ParticleMesh([Nmesh,Nmesh,Nmesh], BoxSize, comm=comm) wn = pm.generate_whitenoise(seed) dlin = wn.apply(lambda k, v: Pk(sum(ki 2 for ki in k)0.5) * 0.5 v / v.BoxSize.prod() ** 0.5) return dlin

def generate_d1(delta_ic, cosmo, nbar, zic, zout, plot=True, weight=True, Rsmooth=0, seed=1234, Rdelta=0, comm=None): scale_factor = 1/(1+zout) Nmesh = delta_ic.Nmesh BoxSize = delta_ic.BoxSize[0] prefactor = cosmo.scale_independent_growth_factor(zout)/cosmo.scale_independent_growth_factor(zic)

pos = UniformCatalog(nbar, BoxSize=BoxSize, seed=seed, comm=comm)['Position'].compute()
N = pos.shape[0]
catalog = np.empty(pos.shape[0], dtype=[('Position', ('f8', 3)), ('delta_1', 'f8')])
displ_catalog = np.empty(pos.shape[0], dtype=[('displ', ('f8',3))])

catalog['Position'][:] = pos[:]

layout = delta_ic.pm.decompose(pos)

catalog['delta_1'] = delta_ic.c2r().readout(pos)

def potential_transfer_function(k, v):
    k2 = k.normp(zeromode=1)
    return v / (k2)
pot_k = delta_ic.apply(potential_transfer_function, out=Ellipsis)

for d in range(3):
    def force_transfer_function(k, v, d=d):
        return k[d] * 1j * v
    force_d = pot_k.apply(force_transfer_function).c2r(out=Ellipsis)
    displ_catalog['displ'][:, d] = force_d.readout(pos, layout=layout, resampler='cic')*prefactor

catalog['Position'] = (catalog['Position'] + displ_catalog['displ']) % BoxSize
del pos, displ_catalog

d1 = ArrayCatalog(catalog, BoxSize=BoxSize * np.ones(3), Nmesh=Nmesh, comm=comm).to_mesh(compensated=True)
# d1 = ArrayCatalog(catalog, BoxSize=BoxSize * np.ones(3), Nmesh=Nmesh, comm=comm).to_mesh(value='delta_1', compensated=True)
return d1

comm = CurrentMPIComm.get() print ('comm', comm, 'comm.rank', comm.rank) rank = comm.rank

##########################

General parameters

##########################

seed = 5 nbar = 0.01 Nmesh = 64 BoxSize = 100 zout = 1 zic = 127 # TNG initial redshift

print ("Generating HI mock in real-space at output redshift z=%.0f, in a BoxSize L=%.1f using nbar=%.2f (%i particles) on a Nmesh=%i^3 grid with IC seed %i..."%(zout, BoxSize, nbar, int(nbar*BoxSize**3), Nmesh, seed))

Cosmology

c = cosmology.Planck15 c = c.match(sigma8=0.8159) Plin_z0 = cosmology.LinearPower(c, 0) Dic = c.scale_independent_growth_factor(zic)

Generate linear overdensity field at zic

print ('Generating initial density field... ') dlin = get_dlin(seed, Nmesh, BoxSize, Plin_z0) dlin *= Dic

Compute shifted fields

print ('Computing shifted fields... ') d1 = generate_d1(dlin, c, nbar, zic, zout, comm=comm) d1.save('new_d1seed%i_mpi_novalue'%seed, mode='real')

— Reply to this email directly, view it on GitHub https://github.com/bccp/nbodykit/issues/672#issuecomment-1275123675, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABBWTHO7J2L3OM6SAIFFNTWCWYQLANCNFSM6AAAAAARBGZRWE . You are receiving this because you commented.Message ID: @.***>

andrejobuljen commented 2 years ago

Hi,

Thanks, yes, I also realised that layout was the problem, so now I include it in here for example: delta_1 = delta_ic.c2r().readout(pos, layout=layout, resampler='cic'). I'm finally getting more reasonable results, with some minor differences.

Just to understand better, what would be the best thing to use for the smoothing parameter in layout = delta_ic.pm.decompose(pos, smoothing=?)? And what are the units of the smoothing parameter? Should I use larger values than the cell size?

Bests, Andrej

rainwoodman commented 2 years ago

On Thu, Oct 13, 2022 at 8:01 AM Andrej Obuljen @.***> wrote:

Hi,

Thanks, yes, I also realised that layout was the problem, so now I include it in here for example: delta_1 = delta_ic.c2r().readout(pos, layout=layout, resampler='cic'). I'm finally getting more reasonable results, with some minor differences.

great!

Just to understand better, what would be the best thing to use for the smoothing parameter in layout = delta_ic.pm.decompose(pos, smoothing=?)? And what are the units of the smoothing parameter? Should I use larger values than the cell size?

Usually it is 0.5 times the support of the window. For example the CIC window is 2 cell size in width, so we use a value of 1.

If the pos in readout or paint differs from the pos used for decompose, then the smoothing shall increase by the maximum displacement between the two. Max(abs(pos2 - pos1)).

The unit is in cell size. For example if the cell size is 0.5 mpc/h and if the maximum pos shift is 10 mpc/h then the total smoothing for a CIC window is 20 +1 = 21. (If my handwaving calculation is correct)

Yu

Bests, Andrej

— Reply to this email directly, view it on GitHub https://github.com/bccp/nbodykit/issues/672#issuecomment-1277754814, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABBWTCD2UAF4HJZTMYESPTWDAP4JANCNFSM6AAAAAARBGZRWE . You are receiving this because you commented.Message ID: @.***>