JohannesBuchner / UltraNest

Fit and compare complex models reliably and rapidly. Advanced nested sampling.
https://johannesbuchner.github.io/UltraNest/
Other
153 stars 30 forks source link

Using `UltraNest` with an `MPI` parallelized likelihood. #153

Open mtagliazucchi opened 2 weeks ago

mtagliazucchi commented 2 weeks ago

Hi!

I'm trying to use UltraNest with a very expensive likelihood whose evaluation at a single point of the parameter space needs to be parallelized using MPI. The likelihood is automatically vectorized.

However, if I'm not mistaken, UltraNest uses MPI to parallelize some internal computation/live point proposal. This conflict causes a bug in my program.

Here's a dummy code that reproduces a heavy likelihood parallelized using MPI:

import time
import numpy as np
from mpi4py import MPI

import emcee
import ultranest
sampler = 'emcee'

class dummy_class():

  def  __init__(self, data, comm):
    self.data = data
    self.comm = comm
    self.rank = comm.Get_rank()
    self.size = comm.Get_size()

  def _compute_sub(self, theta, local_idx):
    # mimic an heavy likelihood that scales linearly with the number of data points
    time.sleep(len(self.data[local_idx])/5000) 
    # like is vectorized over theta
    res = np.sum(self.data[local_idx])/np.exp(theta**2)
    return res

  def log_like(self, theta):
    nparams  = np.atleast_1d(theta).shape[0] # number of multiple values of theta to compute 

    # find indexes for each rank
    chunk_size = len(self.data) // self.size
    remainder  = len(self.data) % self.size

    start_idx = self.rank * chunk_size + min(self.rank, remainder)
    end_idx   = start_idx + chunk_size + (1 if self.rank < remainder else 0)
    local_idx = np.arange(start_idx, end_idx)

    # compute
    local_res = self._compute_sub(theta, local_idx)   
    res = self.comm.allreduce(local_res, op=MPI.SUM)

    return np.log(np.abs(res))

I was able to use this likelihood with emcee in the following way:

if __name__ == "__main__" and sampler=='emcee':

  comm = MPI.COMM_WORLD
  rank  = comm.Get_rank()

  np.random.seed(123)
  data = np.random.rand(10000)/500

  dummy_obj = dummy_class(data, comm) 

  theta_true = 0.
  theta_bounds = [-10.,10.]

  def log_prior(theta):
    # vectorized over theta
    condition = np.logical_and(theta_bounds[0] <= theta, theta <= theta_bounds[1])
    return np.where(condition, 
            0.0,
            -np.inf)

  def log_prior_fn(theta):

    # all processes have the same `theta` of the root
    if rank == 0:
      theta = theta
    else:
      theta = None
    theta = comm.bcast(theta, root = 0)

    # vectorized over theta
    lp      = log_prior(theta)

    # we compute the likelihood only where lp is finite
    to_calc = np.isfinite(lp) 
    ll = np.where(to_calc,
                  dummy_obj.log_like(theta),
                  0.0)

    return ll+lp

  # Using emcee.
  nwalkers = 10
  nsteps   = 100
  ndim     = 1
  filename = "test_emcee.h5"

  if rank == 0:
    backend = emcee.backends.HDFBackend(filename)
    backend.reset(nwalkers, ndim)
    initial_state = np.random.normal(loc=theta_true, scale=1., size=(nwalkers,ndim))
    progress = True
    store=True,
  else: 
    backend = None
    initial_state = None
    progress = False
    store=False

  initial_state = comm.bcast(initial_state, root=0)

  sampler = emcee.EnsembleSampler(nwalkers, ndim, log_prior_fn, backend=backend, vectorize=True)
  sampler.run_mcmc(initial_state, 
                   nsteps, 
                   store = store,
                   progress = progress)

The idea of this script is that each MPI process run an emcee sampler so that each of them calls the likelihood function. This is necessary, otherwise the non-root ranks never compute the likelihood on their chunk of data. However, only the root process stores the results and prints the progress. Also, inside the likelihood function the parameters used to compute the likelihood by each rank are forced to be those of the root for consistency between the various sampler. This example works and there is effectively a large speed up in the code.

I can't produce anything similar with UltraNest. I tried with this code here:

if __name__ == "__main__" and sampler=='ultranest':

  comm = MPI.COMM_WORLD
  rank  = comm.Get_rank()

  np.random.seed(123)

  data = np.random.rand(10000)/500

  dummy_obj = dummy_class(data, comm) # each rank

  theta_true = 0.
  theta_bounds = [-10.,10.]
  param_names = ['theta']

  def prior_transform(cube):
    width = theta_bounds[1] - theta_bounds[0]
    return cube * width + theta_bounds[0]

  def log_prior_fn(theta):
    # all processes have the same `theta` of the root
    if rank == 0:
        theta = theta
    else:
        theta = None
    theta = comm.bcast(theta, root = 0) 

    res = dummy_obj.log_like(theta)

    return res.T[0] # ultranest trick to match shapes

  # no store and progress options as in emcee, could be a problem
  sampler = ultranest.ReactiveNestedSampler(param_names, 
                                            log_prior_fn, 
                                            prior_transform,
                                            log_dir="ultranest", 
                                            vectorized=True)
  # try to disable ultranest MPI parallelization
  sampler.use_mpi=False 
  sampler.mpi_size=1
  sampler.mpi_rank=0

  # each run the sampler
  result = sampler.run()
  sampler.print_results()

The program runs when started with a single MPI process (mpirun -np 1 python test.py), but fails for np > 1 with error:

[ultranest] Sampling 400 live points from prior ...
             ^^^^^^^^^^^^^
  File "/home/mt/softwares/miniconda3/envs/chimeragw-stable/lib/python3.11/site-packages/ultranest/integrator.py", line 2560, in run_iter
    for _result in self.run_iter(
  File "/home/mt/softwares/miniconda3/envs/chimeragw-stable/lib/python3.11/site-packages/ultranest/integrator.py", line 1435, in _widen_roots_beyond_initial_plateau
  File "/home/mt/softwares/miniconda3/envs/chimeragw-stable/lib/python3.11/site-packages/ultranest/integrator.py", line 1524, in _widen_roots
AssertionError
    assert num_live_points_missing >= 0
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Does anyone know a workaround for this problem? Thanks!

JohannesBuchner commented 2 weeks ago

It would be better if you do not do MPI within your likelihood (openmp is fine), because ultranest already runs hundreds of likelihood evaluations in parallel, so MPI will just be confused by another parallelisation.

The error is strange. Can you paste the log file from the log_dir?

mtagliazucchi commented 2 weeks ago

Hi, thank you for the reply! Unfortunately, I cannot use openmp since in the real scenario I need to parallelize the likelihood over hundreds of cores, meaning over multiple nodes of a cluster.

To be exact, the script doesn't stop. It prints the message I wrote in the first post as many times as MPI processes are started, and gets stuck there. I have to stop it manually.

The .log file content is:

13:49:47 [ultranest] [DEBUG] ReactiveNestedSampler: dims=1+0, resume=False, log_dir=ultranest/run1, backend=hdf5, vectorized=True, nbootstraps=30, ndraw=128..65536
13:49:47 [ultranest] [INFO] Sampling 400 live points from prior ...
JohannesBuchner commented 2 weeks ago

It seems you are resuming and this line is causing the issue: https://github.com/JohannesBuchner/UltraNest/blob/master/ultranest/integrator.py#L615 Maybe it should be 0. Maybe you can try and put some prints (with self.mpi_size and self.mpi_rank) into the function.

I don't quite see the benefit of using MPI within your likelihood when you can have ultranest parallelise the likelihood, which will save communication overhead. Are there memory constraints that make you prefer MPI parallelisation within the likelihood and serial evaluation by ultranest?

mtagliazucchi commented 2 weeks ago

I don't quite see the benefit of using MPI within your likelihood when you can have ultranest parallelise the likelihood, which will save communication overhead. Are there memory constraints that make you prefer MPI parallelisation within the likelihood and serial evaluation by ultranest?

I'm not sure if ultranest is capable of parallelizing the likelihood as I do in the example code. In the above example, which still reflects a real case scenario, the computational bottleneck is the dimension of the data being considered. I therefore split the data array into data chunks, compute the likelihood on each chunk and then combine the results. Can ultranest parallelization do this?

I have an update on my problem. If I remove the lines in the original code

  # try to disable ultranest MPI parallelization
  sampler.use_mpi=False 
  sampler.mpi_size=1
  sampler.mpi_rank=0

the assertion error disappers, but then code is stucked here

14:44:47 [ultranest] [DEBUG] ReactiveNestedSampler: dims=1+0, resume=False, log_dir=ultranest/run2, backend=hdf5, vectorized=True, nbootstraps=30, ndraw=128..65536
14:44:47 [ultranest] [INFO] Sampling 400 live points from prior ...
14:44:48 [ultranest] [DEBUG] Found plateau of 4/400 initial points at L=-97.371. Avoid this by a continuously increasing loglikelihood towards good regions.
14:44:48 [ultranest] [INFO] Widening roots to 403 live points (have 400 already) ...
14:44:48 [ultranest] [INFO] Sampling 3 live points from prior ...

It seems you are resuming and this line is causing the issue: https://github.com/JohannesBuchner/UltraNest/blob/master/ultranest/integrator.py#L615 Maybe it should be 0. Maybe you can try and put some prints (with self.mpi_size and self.mpi_rank) into the function.

I tried to set the value on that line to be zero and also on line 1516 (with options sampler.use_mpi=False, sampler.mpi_size=1, sampler.mpi_rank=0 as in the first case). The code still doesn't work and is pending here

  File "/home/mt/softwares/chimera_dev/CHIMERA_JAX/examples/test/test_mpi_like_generic.py", line 91, in <module>
    result = sampler.run()
             ^^^^^^^^^^^^^
  File "/home/mt/softwares/miniconda3/envs/chimeragw-stable/lib/python3.11/site-packages/ultranest/integrator.py", line 2560, in run_iter
    for _result in self.run_iter(
  File "/home/mt/softwares/miniconda3/envs/chimeragw-stable/lib/python3.11/site-packages/ultranest/integrator.py", line 2560, in run_iter
    self._widen_roots_beyond_initial_plateau(
  File "/home/mt/softwares/miniconda3/envs/chimeragw-stable/lib/python3.11/site-packages/numpy/core/fromnumeric.py", line 2953, in min
    Lmin = np.min(Ls)
           ^^^^^^^^^^
ValueError: zero-size array to reduction operation minimum which has no identity
    return _wrapreduction(a, np.minimum, 'min', axis, None, out,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/mt/softwares/miniconda3/envs/chimeragw-stable/lib/python3.11/site-packages/numpy/core/fromnumeric.py", line 88, in _wrapreduction
    return ufunc.reduce(obj, axis, dtype, out, **passkwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: zero-size array to reduction operation minimum which has no identity