dfm / emcee

The Python ensemble sampling toolkit for affine-invariant MCMC
https://emcee.readthedocs.io
MIT License
1.46k stars 431 forks source link

Using `emcee` with a likelihood whose evaluation is parallelized using MPI. #527

Open mtagliazucchi opened 5 days ago

mtagliazucchi commented 5 days ago

Hi everyone, I'm trying to use emcee with a very expensive likelihood whose evaluation needs to be parallelized using MPI.

Traditionally, emcee uses MPI tasks to compute the likelihood on different points of the parameter space corresponding to walkers proposed positions. In my case, I need MPI tasks to parallelize the likelihood evaluation on a single point of the parameter space by splitting the original dataset in several chunks and computing sub-terms of the likelihood on those chunks. The likelihood sub-terms are then gathered together and returned by the master process. The likelihood is also vectorized over the model parameters, so that it is possible to use the vectorize option of emcee and evolve (half of) the walkers simultaneously.

The problem is that I cannot manage to launch emcee with such likelihood.

Here's a dummy code that reproduces the problem:

import os
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
import time
import numpy as np
from mpi4py import MPI
import emcee

class dummy_class():

    def  __init__(self, data):
        self.data = data

    def compute(self, theta):
        time.sleep(len(self.data)/5000) # to mimic and heavy likelihood that scales linearly with the number of data points
        return np.sum(self.data)/np.exp(theta**2)

def distribute_array_mpi(comm, array):

    rank  = comm.Get_rank()
    nproc = comm.Get_size()

    ndata = len(array)

    base_size = ndata // nproc
    remainder = ndata % nproc

    # Calculate starting and ending indexes for each rank
    if rank < remainder:
        start_idx = rank * (base_size + 1)
        end_idx   = start_idx + base_size + 1
    else:
        start_idx = remainder * (base_size + 1) + (rank - remainder) * base_size
        end_idx   = start_idx + base_size

    return array[start_idx:end_idx]

def compute_log_prob(comm, rank_dummy, theta):

    rank = comm.Get_rank()

    partial_result = rank_dummy.compute(theta)

    res = comm.reduce(partial_result, op=MPI.SUM, root=0)

    if rank == 0:
        return -np.log(np.abs(res))

if __name__ == "__main__":

    comm = MPI.COMM_WORLD
    rank  = comm.Get_rank()

    np.random.seed(123)
    data = np.random.rand(10000)/500

    rank_data = distribute_array_mpi(comm, data)
    rank_dummy_class = dummy_class(rank_data)

    theta_true = 0.
    theta_bounds = [-10.,10.]

    def log_prior(theta):
        # vectorized over theta
        condition = np.logical_and(theta_bounds[0] <= theta, theta <= theta_bounds[1])
        return np.where(condition, 
                      0.0,
                      -np.inf)

    def log_prior_fn(theta):
        # vectorized over theta

        lp      = log_prior(theta)

        to_calc = np.isfinite(lp) # we compute the likelihood only where lp is finite

        ll      = np.where(to_calc,
                          compute_log_prob(comm, rank_dummy_class, theta),
                      0.0)

        if rank ==0:
            return ll+lp

    comm.Barrier()
    t0  = MPI.Wtime()
    res = log_prior_fn(np.linspace(-2.,2.,5)) 
    tf  = MPI.Wtime()
    if rank == 0:
        print(f"Like value is {res}")
        print(f"Computation time is {tf-t0}") 

    comm.Barrier()

    # Trying using emcee.
    nwalkers = 10
    nsteps   = 100
    ndim     = 1
    filename = "test.h5"

    # Option 1.

    backend = emcee.backends.HDFBackend(filename)
    backend.reset(nwalkers, ndim)
    sampler = emcee.EnsembleSampler(nwalkers, ndim, log_prior_fn, backend=backend, vectorize=True)

    initial_state = np.random.normal(loc=theta_true, scale=1., size=(nwalkers,ndim))

    sampler = comm.bcast(sampler, root = 0)
    initial_state = comm.bcast(initial_state, root=0)

    sampler.run_mcmc(initial_state, 
                           nsteps, 
                           progress = True)

    # Option 2. Only Rank 0 run emcee

    if rank == 0:
        backend = emcee.backends.HDFBackend(filename)
        backend.reset(nwalkers, ndim)
        sampler = emcee.EnsembleSampler(nwalkers, ndim, log_prior_fn, backend=backend, vectorize=True)

        initial_state = np.random.normal(loc=theta_true, scale=1., size=(nwalkers,ndim))

        sampler.run_mcmc(initial_state, 
                                    nsteps, 
                                    progress = True)

    # Option 3. Rank 0 generate the sampler, shares with other processes and everyone run it.
    if rank == 0:
        backend = emcee.backends.HDFBackend(filename)
        backend.reset(nwalkers, ndim)
        sampler = emcee.EnsembleSampler(nwalkers, ndim, log_prior_fn, backend=backend, vectorize=True)

        initial_state = np.random.normal(loc=theta_true, scale=1., size=(nwalkers,ndim))
    else:
        sampler = None
        initial_state = None

    # Then shares it with the other processes.
    sampler = comm.bcast(sampler, root = 0)
    initial_state = comm.bcast(initial_state, root=0)

    sampler.run_mcmc(initial_state, 
                           nsteps, 
                           progress = True)

My code is much more complicated but the general structure is the same of this example. In my code, I must parallelize in this way and not in the traditional one.

Using options 1 and 3 I get the error:

Traceback (most recent call last):
  File "/home/mt/softwares/miniconda3/envs/chimeragw-stable/lib/python3.11/site-packages/emcee/ensemble.py", line 506, in compute_log_prob
    blob = [l[1:] for l in results if len(l) > 1]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'NoneType' object is not iterable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/mt/softwares/chimera_dev/CHIMERA_JAX/examples/test_mpi4jax.py", line 122, in <module>
    sampler.run_mcmc(initial_state, 
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/mt/softwares/miniconda3/envs/chimeragw-stable/lib/python3.11/site-packages/emcee/ensemble.py", line 450, in run_mcmc
    for results in self.sample(initial_state, iterations=nsteps, **kwargs):
  File "/home/mt/softwares/miniconda3/envs/chimeragw-stable/lib/python3.11/site-packages/emcee/ensemble.py", line 351, in sample
    state.log_prob, state.blobs = self.compute_log_prob(state.coords)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/mt/softwares/miniconda3/envs/chimeragw-stable/lib/python3.11/site-packages/emcee/ensemble.py", line 511, in compute_log_prob
    log_prob = np.array([_scalar(l) for l in results])
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'NoneType' object is not iterable

When using option 2, the code stucks somewhere and no MCMC step is performed.

Does anybody know how to solve this issue?

mtagliazucchi commented 4 days ago

I have found a workaround. The idea is that each rank runs its instance of emcee, but only the master saves the chain. It is necessary to change the likelihood so that each rank returns the same likelihood value (before only the master returned it). It is also necessary to ensure that the theta parameters are the same for all processes. Here's the code if anyone is interested.

import os
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
import time
import numpy as np
from mpi4py import MPI
import emcee

class dummy_class():

    def  __init__(self, data):
        self.data = data

    def compute(self, theta):
        time.sleep(len(self.data)/5000) # to mimic and heavy likelihood that scales linearly with the number of data points
        return np.sum(self.data)/np.exp(theta**2)

def distribute_array_mpi(comm, array):

    rank  = comm.Get_rank()
    nproc = comm.Get_size()

    ndata = len(array)

    base_size = ndata // nproc
    remainder = ndata % nproc

    # Calculate starting and ending indexes for each rank
    if rank < remainder:
        start_idx = rank * (base_size + 1)
        end_idx   = start_idx + base_size + 1
    else:
        start_idx = remainder * (base_size + 1) + (rank - remainder) * base_size
        end_idx   = start_idx + base_size

    return array[start_idx:end_idx]

def compute_log_prob(comm, rank_dummy, theta):

    rank = comm.Get_rank()

    partial_result = rank_dummy.compute(theta)

    res = comm.allreduce(partial_result, op=MPI.SUM) # the likelihood is now returned by all processes

    return np.log(np.abs(res))

if __name__ == "__main__":

    comm = MPI.COMM_WORLD
    rank  = comm.Get_rank()

    np.random.seed(123)
    data = np.random.rand(10000)/500

    print(f"Rank {rank} has max data {np.max(data)} and min data {np.min(data)}")

    rank_data = distribute_array_mpi(comm, data)
    rank_dummy_class = dummy_class(rank_data)

    theta_true = 0.
    theta_bounds = [-10.,10.]

    def log_prior(theta):

        # vectorized over theta
        condition = np.logical_and(theta_bounds[0] <= theta, theta <= theta_bounds[1])
        return np.where(condition, 
                        0.0,
                        -np.inf)

    def log_prior_fn(theta):

        if rank == 0:
            theta = theta
        else:
            theta = None

        theta = comm.bcast(theta, root = 0) # all processes have the same `theta` of the master

        lp      = log_prior(theta)

        to_calc = np.isfinite(lp) # we compute the likelihood only where lp is finite

        ll      = np.where(to_calc,
                           compute_log_prob(comm, rank_dummy_class, theta),
                           0.0)
        return ll+lp

    comm.Barrier()
    t0  = MPI.Wtime()
    res = log_prior_fn(np.linspace(-2.,2.,5)) 
    tf  = MPI.Wtime()
    if rank == 0:
        print(f"Like value is {res}")
        print(f"Computation time is {tf-t0}") 

    comm.Barrier()

    # Trying using emcee.
    nwalkers = 10
    nsteps   = 100
    ndim     = 1
    filename = "test2.h5"

    if rank == 0:
        backend = emcee.backends.HDFBackend(filename)
        backend.reset(nwalkers, ndim)
        initial_state = np.random.normal(loc=theta_true, scale=1., size=(nwalkers,ndim))
        progress = True
    else: 
        backend = None
        initial_state = None
        progress = False

    initial_state = comm.bcast(initial_state, root=0)

    sampler = emcee.EnsembleSampler(nwalkers, ndim, log_prior_fn, backend=backend, vectorize=True)
    sampler.run_mcmc(initial_state, 
                     nsteps, 
                     progress = progress)

The code gives the correct posterior of theta, which is also equal to those obtained with the equivalent serial code.

I'm not sure if this approach is correct and efficient. If anyone has any advice, I would appreciate it, otherwise the issue can be closed!

mtagliazucchi commented 3 days ago

Final comment: the previous code can use a lot of memory since each MPI task runs its MCMC, but only the master saves the chain on a h5 file, while all the others keep their chain (which is equal to the one of the master) in the memory. To overcome this problem I tried to initialize the backend of the non master processes as NoStoreBackend, that is a custom emcee.backends defined as:

class NoStoreBackend(emcee.backends.Backend):
    """A backend that does not store the chain in memory."""

    def __init__(self, dtype=None):
        super().__init__(dtype=dtype)
        self.initialized = False

    def reset(self, nwalkers, ndim):
        self.nwalkers = int(nwalkers)
        self.ndim     = int(ndim)
        self.blobs = None
        self.random_state = None

    def save_step(self, state, accepted):
        pass

    def grow(self, ngrow, blobs):
        pass 

    def get_last_sample(self):
        raise NotImplementedError("This backend does not store the chain or the last sample.")

    def get_chain(self, **kwargs):
        raise NotImplementedError("The chain is not stored in this backend.")

    def get_log_prob(self, **kwargs):
        raise NotImplementedError("Log probabilities are not stored in this backend.")

    def get_blobs(self, **kwargs):
        raise NotImplementedError("Blobs are not stored in this backend.")

    def get_autocorr_time(self, discard=0, thin=1, **kwargs):
        raise NotImplementedError("Autocorrelation time cannot be computed without storing the chain.")

    def __enter__(self):
        return self

    def __exit__(self, exception_type, exception_value, traceback):
        pass

If this backend works, the memory problem should be solved.