JohannesBuchner / UltraNest

Fit and compare complex models reliably and rapidly. Advanced nested sampling.
https://johannesbuchner.github.io/UltraNest/
Other
142 stars 30 forks source link

Run N instances of the sampler in parallel #46

Closed tbainesUA closed 2 years ago

tbainesUA commented 2 years ago

Description

I am trying to process a list of N noisy observation in parallel using the the multiprocessing module and its pool.map function to process a list of N samplers to take advantage of my 8 core machine. I would expect to have 8 samplers running simultaneously but I get prompted with an error.

What I Did

Here is a code snippet of the likelihood function and the parallel process I'm trying to implement:

class LogLikelihood:
    #  likelihood function is defined as a callable object to allow for different sets of data 
    #  to be evaluated and is defined as:
    def __init__(self, y, y_err, model_func, vectorized=False):
        self.y = y 
        self.y_err = y_err
        self._call = model_func 
        self._axis = 1 if vectorized else 0

    def __call__(self, theta):
        return self._evaluate(theta)

    def _evaluate(self, theta):
        theta_interp = theta.copy()
        theta_interp[:,:2] = 10**theta_interp[:,:2]
        y_model = self._call(theta_interp) 
        like = -0.5 * np.power((self.y - y_model) / self.y_err, 2).sum(axis=self._axis)
        return like

wrapper function that implements the sampler.run(...) function

def wrapper_run_sampler(sampler):
    return sampler.run(viz_callback=False)

parellel setup snippet from my .py file


samplers = []
for i in range(n_obs):
    # instantiate loglike function for the different noisy observations
    log_like = LogLikelihood(y_list[i], y_err_list[i], model_grid_interp, vectorize=True)
    sampler = ultranest.ReactiveNestedSampler(param_names=param_names, 
                                            loglike=log_like, 
                                            transform=prior_transform,
                                            vectorized=True)
    samplers.append(sampler)

# run samplers in parallel
with mp.Pool(mp.cpu_count() - 1) as pool:
    res = pool.map(wrapper_run_sampler, samplers)

This is the resulting output I attempt to run in parallel:

TypeError                                 Traceback (most recent call last)
<ipython-input-6-440cbff57264> in <module>
----> 1 results = run_un.run_retrivals_parallel(filenames[0])

~/Box/Projects/LUVOIR-B IFS Simulations/Python Files/retrievals_parallel_un.py in run_retrivals_parallel(filename)
    238         t_1 = time.time()
    239         with mp.Pool(mp.cpu_count() - 1) as pool:
--> 240             res = pool.map(_run_ultranest, samplers)
    241         t_2 = time.time()
    242         print(f'--- Ellapse Time (iter): {t_2 - t_1} s')

~/opt/anaconda3/lib/python3.8/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    362         in a list that is returned.
    363         '''
--> 364         return self._map_async(func, iterable, mapstar, chunksize).get()
    365 
    366     def starmap(self, func, iterable, chunksize=None):

~/opt/anaconda3/lib/python3.8/multiprocessing/pool.py in get(self, timeout)
    769             return self._value
    770         else:
--> 771             raise self._value
    772 
    773     def _set(self, i, obj):

~/opt/anaconda3/lib/python3.8/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
    535                         break
    536                     try:
--> 537                         put(task)
    538                     except Exception as e:
    539                         job, idx = task[:2]

~/opt/anaconda3/lib/python3.8/multiprocessing/connection.py in send(self, obj)
    204         self._check_closed()
    205         self._check_writable()
--> 206         self._send_bytes(_ForkingPickler.dumps(obj))
    207 
    208     def recv_bytes(self, maxlength=None):

~/opt/anaconda3/lib/python3.8/multiprocessing/reduction.py in dumps(cls, obj, protocol)
     49     def dumps(cls, obj, protocol=None):
     50         buf = io.BytesIO()
---> 51         cls(buf, protocol).dump(obj)
     52         return buf.getbuffer()
     53 

TypeError: cannot pickle 'mpi4py.MPI.Intracomm' object

Is ultranest capable of running a list observations in parallel?

Thank you for your time.

JohannesBuchner commented 2 years ago

I think you can do this by uninstalling mpi4py.

The problematic code is here: https://github.com/JohannesBuchner/UltraNest/blob/master/ultranest/integrator.py#L489

Alternatively, you can set sampler.comm=None, sampler.mpi_rank=0 and sampler.mpi_size=1 (they should already be 0 and 1 actually).

A fix could be to add a else to the if and set self.comm to None.

JohannesBuchner commented 2 years ago

Another solution, which is what I usually do, is to run multiple python instances, each receiving as an command line argument the data file to operate on (sys.argv[1]).

This can then be combined with xargs --max-args=1 --max-procs=10, or make -j10, for parallelisation.

tbainesUA commented 2 years ago

It seems that just uninstalling the mpi4py package did the trick. Thanks for the help and the additional suggestions!

dforero0896 commented 2 years ago

Hi, I have been trying to accomplish the same (i.e. having N parallel samples working on different data realisations) but with MPI in order to take advantage of multiple nodes. Is this possible? I can't of course uninstal mpi4py since that is how I distribute tasks and setting sampler.comm=None makes it impossible to broadcast an array I need to communicate to all nodes.

I get this error

slurmstepd: error: mpi/pmi2: value not properly terminated in client request
slurmstepd: error: mpi/pmi2: request not begin with 'cmd='
slurmstepd: error: mpi/pmi2: full request is: 00000000000000000000000000000000000000000000000

slurmstepd: error: mpi/pmi2: invalid client request
slurmstepd: error: mpi/pmi2: value not properly terminated in client request
slurmstepd: error: mpi/pmi2: request not begin with 'cmd='
slurmstepd: error: mpi/pmi2: full request is: 00000000000000000000000000000000000000000000000

slurmstepd: error: mpi/pmi2: invalid client request
Error in system call pthread_mutex_destroy: Device or resource busy
    ../../src/mpi/init/init_thread_cs.c:66
Abort(2663567) on node 0 (rank 0 in comm 0): Fatal error in PMPI_Init_thread: Other MPI error, error stack:
MPIR_Init_thread(143)........: 
MPID_Init(1310)..............: 
MPIDI_OFI_mpi_init_hook(1953): 
MPIDU_bc_table_create(317)...: 
srun: Job step aborted: Waiting up to 32 seconds for job step to finish.
slurmstepd: error: *** STEP 5151739.0 ON node218 CANCELLED AT 2021-12-21T22:34:08 ***
srun: error: node218: tasks 1-3: Killed
srun: error: node218: task 0: Killed

Which I think may be related to me and Ultranest trying to use the communicator for different things.

I am aware of the alternative of wrapping everything in a bash script or using xargs/parallel or so. But I would still like to know if it is possible to launch independent samplers in different MPI processes since I communicate a couple of arrays from the root to the rest of the tasks.

Thanks in advance for your help.

JohannesBuchner commented 2 years ago

I think you have to go deep into MPI documentation to achieve this. Have a look what the alternatives are to using MPI.COMM_WORLD. Can you use a communicator limited to some nodes? Have a look at MPI configurations. Can one specify to use only some nodes to parallelise to?

dforero0896 commented 2 years ago

I have not read about this but I could check, however I was hoping it would be possible for the sampler not to use MPI altogether, is this something that can be disabled perhaps at build time?

JohannesBuchner commented 2 years ago

@dforero0896, see "Alternatively" in https://github.com/JohannesBuchner/UltraNest/issues/46#issuecomment-937131298 for how to achieve this.

dforero0896 commented 2 years ago

Right, but that wouldn't allow me to use MPI either, which is the point.

JohannesBuchner commented 2 years ago

Yes it would, in that case ultranest is not doing anything with MPI. You open your own MPI.COMM_WORLD before running ultranest and select based on the rank the dataset to work on.

dforero0896 commented 2 years ago

I am importing MPI before ultranest and using the communicator to select the batch of data to be processed by each task, however I still get the same error. I think it might then be a SLURM issue instead so I will have to consult with the person in charge of the cluster. Thanks for your help!