NanoComp / meep

free finite-difference time-domain (FDTD) software for electromagnetic simulations
GNU General Public License v2.0
1.26k stars 629 forks source link

Using MPI across multiple sims #1178

Closed smartalecH closed 4 years ago

smartalecH commented 4 years ago

Typically if I have a python script with one simulation object, I can easily parallelize it using mpi:

mpirun -np 16 python3 my_script.py

Now let's say my script has four simulation objects and I want to run all of them in parallel and each with multiple processes from the same script. Using the example above with 16 allocated processes, is there a clever way to assign 4 processes to each of my simulation objects that all run concurrently (and only communicate between "subgroups")?

This would be especially useful for multiobjective adjoint optimization.

I realize I could do this with some clever bash scripting but it would be nice to keep everything "in the loop" of a single python script.

stevengj commented 4 years ago

Yup, this was already implemented 11 years ago in 81dcccb0766b124976e406f01af66a6780980252

stevengj commented 4 years ago

Call

n = meep.divide_parallel_processes(N)

to divide the MPI processes into N subgroups, and returns the index n of the current group, which can be used to decide which simulation to run.

That is, you have one run script, and the script only creates one simulation object (typically) — depending on the value of n that it receives, it will create a different simulation object (i.e. different parameters).

stevengj commented 4 years ago

And if you need to do some global communications among all processes, just do

meep.begin_global_communications()
# ...do stuff...
meep.end_global_communications()
stevengj commented 4 years ago

Note that the statement in https://meep.readthedocs.io/en/latest/Parallel_Meep/#different-forms-of-parallelization that "Meep provides no explicit support for this mode of operation" is actually untrue.

stevengj commented 4 years ago

For example, if you want to synchronize an array between all the processes, not just a subgroup, you could have each process allocate an array to store the results, initialized to zero, write its portion of the data into the array, and then call

meep.begin_global_communications()
meep.sum_to_all(input_array, summed_array)
meep.end_global_communications()

However, for this to work we will also need to add SWIG typemaps for the sum_to_all method that takes array arguments, since I don't think we have a high-level wrapper for this that works with Python arrays yet.

oskooi commented 4 years ago

The divide_parallel_processes feature does not seem to be working. The following is a simple test to compute the total flux from a point source at a single frequency. The frequency is one of two values (1.0 or 0.5) determined by the subgroup index (either 0 or 1 for a splitting into two groups). The rest of the Simulation parameters are identical. However, regardless of how many MPI processes the simulation is launched with, only the frequency 1.0 seems to be computed.

script

import meep as mp

resolution = 20

sxy = 4
dpml = 1
cell = mp.Vector3(sxy+2*dpml,sxy+2*dpml,0)

pml_layers = [mp.PML(dpml)]

n = mp.divide_parallel_processes(2)
fcen = 1.0 if n == 0 else 0.5

sources = [mp.Source(src=mp.GaussianSource(fcen,fwidth=0.2*fcen),
                     center=mp.Vector3(),
                     component=mp.Ez)]

symmetries = [mp.Mirror(mp.X),
              mp.Mirror(mp.Y)]

sim = mp.Simulation(cell_size=cell,
                    resolution=resolution,
                    sources=sources,
                    symmetries=symmetries,
                    boundary_layers=pml_layers)

flux_box = sim.add_flux(fcen, 0, 1,
                        mp.FluxRegion(mp.Vector3(y=0.5*sxy), size=mp.Vector3(sxy)),
                        mp.FluxRegion(mp.Vector3(y=-0.5*sxy), size=mp.Vector3(sxy), weight=-1),
                        mp.FluxRegion(mp.Vector3(0.5*sxy), size=mp.Vector3(y=sxy)),
                        mp.FluxRegion(mp.Vector3(-0.5*sxy), size=mp.Vector3(y=sxy), weight=-1))

sim.run(until_after_sources=mp.stop_when_fields_decayed(50, mp.Ez, mp.Vector3(), 1e-6))

tot_flux = mp.get_fluxes(flux_box)[0]
print("flux:, {}, {:.4f}, {:.6f}".format(n,fcen,tot_flux))

output

$ mpirun -n 4 python split.py
Using MPI version 3.1, 4 processes
-----------
Initializing structure...
Halving computational cell along direction x
Halving computational cell along direction y
Splitting into 2 chunks evenly
time for choose_chunkdivision = 0.000274579 s
Working in 2D dimensions.
Computational cell is 6 x 6 x 0 with resolution 20
time for set_epsilon = 0.00403629 s
-----------
field decay(t = 50.025000000000006): 10.992590364782105 / 10.992590364782105 = 1.0
field decay(t = 100.05000000000001): 4.080185867370076e-10 / 10.992590364782105 = 3.711760132936559e-11
run 0 finished at t = 100.05000000000001 (4002 timesteps)
flux:, 0, 1.0000, 9.862873

There is no indication that results for the other frequency (0.5) belonging to the second subgroup with index 1 was computed.

smartalecH commented 4 years ago

Doesn't the print function only operate for the master process now?

I think a better way to test is with a sum_to_all. I'm still working on a typemap suite.

stevengj commented 4 years ago

print only works from the "real" master process (process 0).

So you'll need to do some global communications to send the results back to the master process.

stevengj commented 4 years ago

Note that you can do global communications directly with mpi4py using MPI.COMM_WORLD … no actual need for sum_to_all and begin_global_communications.

mpi4py has direct support for sending numpy arrays, for example.

oskooi commented 4 years ago

Using mpi4py for the global communications does indeed reveal that divide_parallel_processes is working as expected. The end of the example from above is modified to send/receive the flux values from two processes:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

tot_fluxes = [0,0]
if rank == 0:
    comm.send(tot_flux, dest=1, tag=11)
    tot_fluxes[0] = tot_flux
    tot_fluxes[1] = comm.recv(source=1, tag=11)
elif rank == 1:
    comm.send(tot_flux, dest=0, tag=11)
    tot_fluxes[1] = tot_flux
    tot_fluxes[0] = comm.recv(source=0, tag=11)

print(tot_fluxes)

The output for mpirun -n 2 python split.py correctly shows two different flux values (corresponding to the two different frequencies) stored in the list:

[9.862872853382488, 19.653727538698696]

It would be nice to have a high-level interface for storing/printing the output from the different subgroups rather than have the user call the low-level send and recv MPI commands.

stevengj commented 4 years ago

Might be nice to have a function

merged = merge_subgroup_data(data)

that takes a numpy array data from every process, which is expected to be identical across each subgroup, and then returns an array merged which is just the concatenated data from each subgroup.

Under the hood, it would call MPI_Alltoallv on every process (COMM_WORLD), but the inputs from every process would be an empty array (sendcount=0) except on process 0 of each subgroup, which would send data.

mawc2019 commented 3 years ago

Hi @smartalecH , I want to make sure that I am using divide_parallel_processes and merge_subgroup_data correctly when doing optimization. Could you take a look at the code below? You may only look at the few lines that are relevant to the usage. Thanks!

import meep as mp
import meep.adjoint as mpa
import numpy as np
import cmath,math
from autograd import numpy as npa
from autograd import grad,jacobian,tensor_jacobian_product
import nlopt
import os
Si = mp.Medium(index=3.48)
SiO2 = mp.Medium(index=1.45)
Air = mp.Medium(index=1)

num_groups = 3
n = mp.divide_parallel_processes(num_groups)

resolution,design_resolution = 40,40

design_region_width,design_region_height = 0.8,0.4
Lx,Ly = design_region_width+0.2,4
dpml = 1.5
pml_layers = [mp.PML(dpml, direction=mp.Y)]
cell_size = mp.Vector3(Lx,Ly+2*dpml)
monitor_height,monitor_width = design_region_height/2,0.1

minimum_length = 0.08 # minimum length scale (microns)
eta_i = 0.5 # blueprint (or intermediate) design field thresholding point (between 0 and 1)
eta_e = 0.55 # erosion design field thresholding point (between 0 and 1)
eta_d = 1-eta_e # dilation design field thresholding point (between 0 and 1)
filter_radius = mpa.get_conic_radius_from_eta_e(minimum_length,eta_e)
print("Minimum length: ",minimum_length,". Filter radius: ",filter_radius)

fcen,fwidth = 1/1.55,0.2/1.55
theta_in = np.radians(90*(num_groups-n)/num_groups) # different angles of incidence
k = mp.Vector3(fcen*1.45).rotate(mp.Vector3(z=1), theta_in)

def pw_amp(k,x0): # amplitude function
  def _pw_amp(x):
    return cmath.exp(1j*2*math.pi*k.dot(x+x0))
  return _pw_amp

source_center,source_size  = mp.Vector3(0,-Ly/4),mp.Vector3(Lx,0)
src = mp.GaussianSource(frequency=fcen,fwidth=fwidth)
source = [mp.Source(src,component=mp.Ez,center=source_center,size=source_size,amp_func=pw_amp(k,source_center))]

Nx,Ny = int(round(design_resolution*design_region_width)),1

design_variables = mp.MaterialGrid(mp.Vector3(Nx,Ny),Air,Si,grid_type='U_MEAN')
design_region = mpa.DesignRegion(design_variables,volume=mp.Volume(center=mp.Vector3(0,design_region_height/2),size=mp.Vector3(design_region_width,design_region_height)))

geometry = [mp.Block(center=mp.Vector3(0,-Ly/4-dpml/2), material=SiO2, size=mp.Vector3(Lx,Ly/2+dpml, 0)), # subtrate
    mp.Block(center=design_region.center, size=design_region.size, material=design_variables) # design region
]

sim = mp.Simulation(cell_size=cell_size,boundary_layers=pml_layers,geometry=geometry,sources=source,k_point=k,resolution=resolution)
nf = mpa.FourierFields(sim,mp.Volume(center=mp.Vector3(0,monitor_height),size=mp.Vector3(monitor_width,0)),mp.Ez,yee_grid=True)
ob_list = [nf]

def J(near_field):
    near_field = near_field.flatten()
    intensity = npa.abs(near_field)**2
    return npa.sum(intensity)/len(near_field) # average intensity

def mapping(x,eta,beta):
    filtered_field = mpa.conic_filter(x,filter_radius*design_resolution,design_region_width,Ny/design_resolution,design_resolution)
    projected_field = mpa.tanh_projection(filtered_field,beta,eta) # projection
    return projected_field.flatten() # interpolate to actual materials

def f(x, grad):
    t = x[0] # "dummy" parameter
    v = x[1:] # design parameters
    if grad.size > 0:
        grad[0] = 1
        grad[1:] = 0
    return t

opt = mpa.OptimizationProblem(
    simulation = sim,
    objective_functions = J,
    objective_arguments = ob_list,
    design_regions = [design_region],
    fcen=fcen,
    df = 0,
    nf = 1,
    decay_by = 1e-5,
    decay_fields=[mp.Ez]
)

def c_minmax(result,x,gradient,eta,beta):

    t = x[0] # auxiliary variable
    v = x[1:] # design parameters

    f0, dJ_single = opt([mapping(v,eta,beta)])
    f_arr = mp.merge_subgroup_data(f0)
    dJ_du = mp.merge_subgroup_data(dJ_single)

    # Backprop the gradients through our mapping function
    my_grad = np.zeros(dJ_du.shape)
    for k in range(num_groups): 
        my_grad[:,k] = -dJ_du[:,k]

    # Assign gradients
    if gradient.size > 0:
        gradient[:,0] = 1 # gradient w.r.t. "t"
        gradient[:,1:] = my_grad.T # gradient w.r.t. each frequency objective

    result[:] = t-np.real(f_arr)
    print("Auxiliary variable: ",t)

# Initial guess
x = 0.5*np.ones(Nx*Ny)

# lower and upper bounds
lb = np.zeros((Nx*Ny,))
ub = np.ones((Nx*Ny,))

# insert the auxiliary variable
x = np.insert(x,0,1)
lb = np.insert(lb,0,0)
ub = np.insert(ub,0,np.inf)

cur_beta,beta_scale = 2,1.2
num_betas = 100
update_factor = 10
algorithm = nlopt.LD_MMA

for iters in range(num_betas):
    solver = nlopt.opt(algorithm, Nx*Ny+1)
    solver.set_lower_bounds(lb)
    solver.set_upper_bounds(ub)
    solver.set_max_objective(f)  
    solver.set_maxeval(update_factor)
    solver.set_xtol_rel(1e-8)
    solver.add_inequality_mconstraint(lambda r,x,g: c_minmax(r,x,g,eta_i,cur_beta), np.array([1e-8]*num_groups))
    x[:] = solver.optimize(x)
    print("Current β: ",cur_beta)
    cur_beta = cur_beta*beta_scale
smartalecH commented 3 years ago

am using divide_parallel_processes and merge_subgroup_data correctly when doing optimization

Looks good to me! I'm assuming you aren't getting the results you are hoping for?

mawc2019 commented 3 years ago

Thank you! I got errors when running my optimization tasks using divide_parallel_processes and merge_subgroup_data. I think one of the reasons may be that I was using the branch loop_in_chunks_fixes. This branch worked well for some of my previous optimization tasks but still has bugs. When I stop using this branch, some errors disappear. Although some other errors remain, they are irrelevant to the issue here.

ReDoDx09 commented 3 years ago

@smartalecH could you please tell how to choose correctly the number of processes np in case where I'im working with cluster ? lets suppose I have : nodes=N cpus-per-task=M in this case what is the max value of the number of processes np ? Thank you

stevengj commented 3 years ago

If you have N nodes and M cpus per node, then normally you would want to choose at most NM processes.

ReDoDx09 commented 3 years ago

@stevengj @smartalecH i would be so grateful if one of you can help me to translate this part of code from C++ to python Thank you

include

using namespace meep; int main(int argc, char **argv) { initialize mpi(argc, argv); // do this even for non-MPI Meep double resolution = 20; // pixels per distance grid_volume v = vol2d(5, 10, resolution); // 5x10 2d cell structure s(v, eps, pml(1.0)); fields f(&s);

f.output_hdf5(Dielectric, v.surroundings());

double freq = 0.3, fwidth = 0.1; gaussian_src_time src(freq, fwidth); f.add_point_source(Ey, src, vec(1.1, 2.3)); while (f.time() < f.last_source_time()) { f.step(); }

f.output_hdf5(Hz, v.surroundings());

return 0; } double eps(const vec &p) { if (p.x() < 2 && p.y() < 3) return 12.0; return 1.0; }