Open AliceGem opened 4 years ago
It should work, as long as you can transfer the individuals data and get the fitness back. But, there are no example about how doing that.
Hi @fmder, launching an optimization with mpi (e.g. launching the python script with mpirun), I get different values of the same individual in different processes, which is incorrect. I tried broadcasting the individual values from one process to all the others, to still have the fitness evaluation in parallel (as it includes a simulation with high computational load). However, the job gets stucked in the first individual evaluation. Did you happen to explore mpi with DEAP more? Basically what I would need for my application is to have the fitness evaluation to run in parallel, while the rest of the optimization could be managed by one single process.
It is really hard to say. I mean I can guess the structure should look like:
all nodes:
master node:
for loop
Other than that and without further details I cannot be of much help.
Thanks, Ive been struggling to get it working however. What I dont understand is whether DEAP "tolerates" being ran inside of an MPI context or if it actively uses the MPI nodes to dispatch evalFitness
from an MPI job pool to the single nodes?
I ask this because I use an MPI sensitive simulator inside of my fitness function that will distribute itself over ALL MPI nodes, and it segfaults when the simulation starts. To investigate I added a call to MPI.Barrier
inside of the fitness function and it gets stuck there, as if not all MPI ranks are reaching the same MPI checkpoints. My only explanation for that would be that DEAP uses MPI and only 1 rank is supposed to solve an individual's fitness function, is this right?
Here is the part of the code evaluating the fitness:
# Optimization functions are run only on process 0, NEST simulations are still run in parallel
if rank==0:
print("Evaluating fitness on rank 0...")
# Allocation of individuals
population = toolbox.population()
folders = []
for i in range(len(population)):
folders.append("/gen" +str(0)+"_ind"+str(i))
else:
population = None
folders = None
# Broadcasting the required data from process rank 0 to all processes
print("before bcast rank ", rank, " population ", population," folders ", folders)
population = comm.bcast(population, root=0)
folders = comm.bcast(folders, root=0)
print("after bcast rank ", rank, " population ", population," folders ", folders)
# Evaluating fitness
fits = toolbox.map(toolbox.evaluate, population, folders)
Inside the toolbox.evaluate
function, I use a call to a simulator that I would need to run in parallel on all processes, but apparently the simulator is run only on rank 0, while still "aware" of being inside a parallel environment with 4 MPI processes, causing a segfault error.
You should be able to use the MPIPoolExecutor
of mpi4py.futures
, it has a map
function that you could register in your toolbox; This map function will iterate over the provided iterables in map(func, *iterables)
and for each iteration spawn 1 MPI process. Instead of starting n
nodes with mpiexec
you start your script with the regular python
command and the MPIPoolExecutor
will take care of the rest:
import mpi4py.futures
from deap import base
import your_simulator
def evalFunc(individual):
# Executed in a single MPI process context
fitness = your_simulator.simulate(individual)
return fitness,
if __name__ == "__main__":
# It's important to guard the pool with "__main__" because the spawned MPI workers will also import this
# module, without this guard all workers would think they are supposed to start a pool themselves and this
# will lead to infinite recursion. The workers import this module in the namespace "__worker__" and won't
# enter the "__main__" guard.
pool = mpi4py.futures.MPIPoolExecutor(max_workers=<n_workers>)
toolbox = base.Toolbox()
toolbox.register("map", pool.map)
toolbox.register("evaluate", evalFunc)
# ... Register others
population = toolbox.population()
fits = toolbox.map(toolbox.evaluate, population)
Do note that your master node takes up 1 MPI process and doesn't join the computation, so you have n_workers = n_nodes - 1
and that this will run each simulation on 1 node. If n_workers > n_individuals
the pool will hand out 1 individual to n_individual
workers and the rest of your workers will stay idle. Therefor I would suggest to trim/clone so that n_individuals % n_workers == 0
each generation.
Dynamic process management with MPI (MPI Spawn) is not supported everywhere, so the MPIPoolExecutor
is also not supported on all supercomputers. For those running into issues with MPI Spawn you can also adopt static process pool implementations such as mpipool
. Combining DEAP with mpipool
seems to be even more straightfoward:
from mpipool import Pool
from deap import base
import your_simulator
def evalFunc(individual):
# Executed in a single MPI process context
fitness = your_simulator.simulate(individual)
return fitness,
# Make sure that all the functions your workers need are declared before this line.
pool = Pool()
# Only the master on rank 0 will continue on beyond this line and will wait for jobs
# inside of the Pool constructor
toolbox = base.Toolbox()
toolbox.register("map", pool.map)
toolbox.register("evaluate", evalFunc)
# ... Register others
population = toolbox.population()
fits = toolbox.map(toolbox.evaluate, population)
A few things to note is that the MPI detection mechanism of mpipool
is rudimentary and is best disabled by modifying the downloaded source code (make runs_with_mpi
return True
in mpipool/mpipool.py
). Another thing is that its map
function is not completely analogous to Python's map and does not support a variable number of arguments so you'll have to zip the arguments you want to pass:
fits = toolbox.map(toolbox.evaluate, arg1iter, arg2iter)
# should become
fits = toolbox.map(toolbox.evaluate, zip(arg1iter, arg2iter))
Is DEAP working properly with mpi? Are there any examples with it? Thanks!