LBL-EESA / TECA

TECA, theToolkit for Extreme Climate Analysis, contains a collection of climate anlysis algorithms targetted at extreme event detection and analysis.
Other
54 stars 21 forks source link

Runtime setting of the MPI communicator? #183

Closed taobrienlbl closed 5 years ago

taobrienlbl commented 5 years ago

TECA currently assumes that the MPI communicator is MPI_COMM_WORLD (see below output of grep -r MPI_COMM_WORLD in 3c0304c). It could be useful if this were generalized. This issue is meant to be a starting point for discussing whether we should do this and, if so, how.

For the the training phase of the Bayesian AR detector, the Markov Chain Monte Carlo (MCMC) algorithm that we use emcee, which also uses MPI for parallelization. This is the phase that generates the set of AR detector parameters that the AR detector will then apply in parallel to output data (as in #146). The MCMC algorithm will need to run TECA over a large set of timesteps once for each test parameter. (Note: for the training phase of the detector, TECA has to be run with only one parameter setting at a time, unlike the run phase, where TECA will need to run on an ensemble of parameters simultaneously; so TECA's parallelism in this phase can only be in the time dimension.) It would be convenient if both the MCMC algorithm and TECA could both make use of MPI simultaneously.

Pseudocode for this MCMC operation follows:

# create a set of starting parameters (theta = (percentile threshold, filter width, minimum area)
# for each parallel MCMC chain
theta = ... # theta has shape [3, num_mcmc_parallel]

# emcee runs the for-loop below in parallel using MPI
for i in range(num_mcmc_parallel):
     # get the parameters for the current MCMC chain
     current_theta = theta[:, i]

     # run TECA for the current theta
     teca_output = run_teca(current_theta) # ideally, this would also run with MPI

     # communicate with other MCMC chains to determine the next version of theta for
     # all chains
     theta = emcee_mpi_get_new_theta(teca_output)

Theoretically, I think this should be possible if both emcee and TECA each use a separate MPI communication group. emcee allows run-time setting of its MPI communication group, so I think if TECA allows this, it would be possible for both to utilize MPI. I think that this would work as follows:

Let's say that num_mcmc_parallel = 100, so the first MPI communication group G0 would contain 100 processors and would be assigned to emcee. Each MCMC chain would then need to independently run TECA, so each run of TECA would need its own group. Let's assume we give each TECA process 40 processors, so we would need 100 more groups: G1 through G100, each of which would have 40 processors. The driver script for this would have the task of dividing up the 100 + 40*100 = 4100 total processors among the 101 groups and assigning them appropriately to emcee and the 100 instances of TECA that emcee will run.

This would require a method that sets the MPI communication group for that specific instantiation of a given algorithm, and another method that gets the MPI communication group. The idea would be that instead of using MPI_COMM_WORLD, TECA apps would instead use something like teca_mpi_manager::get_mpi_comm() instead. I suspect that this would be most appropriate in core/teca_mpi_manager.cxx, e.g.,:

teca_mpi_manager::set_mpi_comm(int mpi_comm)
{
  this->teca_mpi_comm = mpi_comm;
}

int teca_mpi_manager::get_mpi_comm()
{
  return this->teca_mpi_comm ;
}

Note that the above code is just for illustration purposes: it looks like the constructor for teca_mpi_manager would also need to be modified, since it currently uses MPI_COMM_WORLD on construction. I'm not immediately sure the most appropriate way to do this.

Output of grep -r MPI_COMM_WORLD in 3c0304c:

./alg/teca_evaluate_expression.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
./alg/teca_table_sort.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
./alg/teca_tc_wind_radii.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
./alg/teca_tc_classify.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
./alg/teca_table_region_mask.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
./alg/teca_tc_trajectory.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
./alg/teca_table_calendar.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
./alg/teca_table_to_stream.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
./alg/teca_table_remove_rows.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
./core/teca_parallel_id.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
./core/teca_mpi_manager.cxx:    MPI_Comm_rank(MPI_COMM_WORLD, &m_rank);
./core/teca_mpi_manager.cxx:    MPI_Comm_size(MPI_COMM_WORLD, &m_size);
./core/teca_time_step_executive.cxx:        MPI_Comm_size(MPI_COMM_WORLD, &tmp);
./core/teca_time_step_executive.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &tmp);
./core/teca_thread_pool.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
./core/teca_thread_pool.cxx:        MPI_Comm_size(MPI_COMM_WORLD, &n_ranks);
./core/teca_thread_pool.cxx:                1, MPI_INT, 0, MPI_COMM_WORLD);
./core/teca_thread_pool.cxx:            0, MPI_DATATYPE_NULL, 0, MPI_COMM_WORLD);
./core/teca_thread_pool.cxx:                1, MPI_INT, 0, MPI_COMM_WORLD);
./core/teca_thread_pool.cxx:            0, MPI_DATATYPE_NULL, 0, MPI_COMM_WORLD);
./core/teca_thread_pool.cxx:                64, MPI_BYTE, 0, MPI_COMM_WORLD);
./core/teca_thread_pool.cxx:            0, MPI_DATATYPE_NULL, 0, MPI_COMM_WORLD);
./core/teca_thread_pool.cxx:                1, MPI_INT, 0, MPI_COMM_WORLD);
./core/teca_thread_pool.cxx:            0, MPI_DATATYPE_NULL, 0, MPI_COMM_WORLD);
./core/teca_thread_pool.cxx:                recv_cnt.data(), displ.data(), MPI_INT, 0, MPI_COMM_WORLD);
./core/teca_thread_pool.cxx:            nullptr, nullptr, MPI_DATATYPE_NULL, 0, MPI_COMM_WORLD);
./core/teca_thread_pool.cxx:        MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED,
./core/teca_binary_stream.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
./core/teca_binary_stream.cxx:            MPI_Bcast(&nbytes, 1, MPI_UNSIGNED_LONG, root_rank, MPI_COMM_WORLD);
./core/teca_binary_stream.cxx:            MPI_Bcast(this->get_data(), nbytes, MPI_BYTE, root_rank, MPI_COMM_WORLD);
./core/teca_binary_stream.cxx:            MPI_Bcast(&nbytes, 1, MPI_UNSIGNED_LONG, root_rank, MPI_COMM_WORLD);
./core/teca_binary_stream.cxx:            MPI_Bcast(this->get_data(), nbytes, MPI_BYTE, root_rank, MPI_COMM_WORLD);
./core/teca_temporal_reduction.cxx:                dest, 3211, MPI_COMM_WORLD))
./core/teca_temporal_reduction.cxx:                2, MPI_UNSIGNED_LONG, 0, MPI_COMM_WORLD);
./core/teca_temporal_reduction.cxx:                0, MPI_DATATYPE_NULL, 0, MPI_COMM_WORLD);
./core/teca_temporal_reduction.cxx:        MPI_Comm_size(MPI_COMM_WORLD, &tmp);
./core/teca_temporal_reduction.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &tmp);
./core/teca_temporal_reduction.cxx:        MPI_Comm_size(MPI_COMM_WORLD, &tmp);
./core/teca_temporal_reduction.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &tmp);
./core/teca_temporal_reduction.cxx:            if (internal::recv(MPI_COMM_WORLD, left_id-1, bstr))
./core/teca_temporal_reduction.cxx:            if (internal::recv(MPI_COMM_WORLD, right_id-1,  bstr))
./core/teca_temporal_reduction.cxx:            if (internal::send(MPI_COMM_WORLD, up_id-1, bstr))
./io/teca_table_writer.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
./io/teca_table_reader.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
./io/teca_table_reader.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
./io/teca_cf_reader.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
./io/teca_cf_reader.cxx:        MPI_Comm_size(MPI_COMM_WORLD, &n_ranks);
./io/teca_vtk_cartesian_mesh_writer.cxx:        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
burlen commented 5 years ago

The way I'm leaning to solve this is to pass the communicator in to teca_algorithm::update. The communicator would be passed into the 3 virtuals, teca_algorithm::get_output_metadata, teca_algorithm::get_upstream_request, and teca_algorithm::execute.

This solution would enable individual pipelines to run with different communicators. I'm thinking especially of nested pipelines such as we have been using in the probabilistic ar detector.

One limitation of this solution is that it implies that the individual stages of a given pipeline all run on same communicator at the same level of concurrency. However, I'm not sure there is a use case where that is important.

taobrienlbl commented 5 years ago

I think this suggestion makes sense, especially if it is implemented in a way that the passed in communicator could also be used to completely disable MPI and avoid any MPI calls. For completeness in this issue, I'm pasting one of @burlen 's comments (from #201) related to this:

I'd like to see if we can do away with enable_mpi flag. When we can set the communicator explicitly, MPI_COMM_SELF would be effectively equivalent to enable_mpi=0

burlen commented 5 years ago

The way I'm leaning to solve this is to pass the communicator in to teca_algorithm::update. The communicator would be passed into the 3 virtuals, teca_algorithm::get_output_metadata, teca_algorithm::get_upstream_request, and teca_algorithm::execute.

This is a clean and attractive solution, however some algorithms make use of MPI before execution starts. For example teca_threaded_algorithm uses the communicator in the creation of the thread pool. this occurs when the user calls teca_threaded_algorithm::set_thread_pool_size.

we could defer use of MPI until the pipeline runs. this would likely introduce some inefficiencies and would be more complicated.

we might be better off setting MPI communicator directly on each algorithm.

I am leaning against having a global communicator, there are cases where this would be problematic. For example the probabilistic ar detector has a nested pipeline that needs to run locally on a node but the external pipeline needs to run across all nodes in the MPI job.

taobrienlbl commented 5 years ago

If we do set the communicator on a per-algorithm basis, one slight concern is that this could lead to a need for more boiler-plate code when constructing simple pipelines. Obviously, a default communicator could be used (perhaps MPI_COMM_WORLD), but then even toggling to use MPI_COMM_SELF would require something like a set_mpi_communicator() call for each algorithm.

As a complementary option, would it make sense (and/or be practical) to have an algorithm utilize the MPI communicator of the upstream algorithm(s) by default? This would allow doing something like running teca_cf_reader.set_mpi_communicator() once--at the top of a pipeline--and having that communicator propagate to all downstream algorithms, unless it is overridden by an algorithm-specific call to set_mpi_communicator(). Of course, this probably isn't as simple as it sounds in my mind; for example, if an algorithm has multiple inputs upstream, how would any conflicts be handled (or should they)?

burlen commented 5 years ago

yeah, I agree that it's a little bit cumbersome, but for most situations people want MPI_COMM_WORLD and would not need to do it.

Setting it at the top(in the cf_reader) or the bottom(in call to update) is the same thing done differently. I agree this seems to be an attractive/aesthetic solution but I think we will regret it. these approaches would introduce inefficiencies and races, working around that adds a lot of complexity, and special case code. I think the special cases would then become confusing for users.

let's consider my per-algorithm set calls a prototype and see how bad it is. If we hate it we can change it. While we work, we can try to come up with more appealing solution.

burlen commented 5 years ago

brainstorming ....

to avoid setting the communicator on every algorithm we might introduce the notion of a communicator stack and an active communicator, ie the one at the top of the stack. what ever communicator is active at the time the algorithm is constructed would be the one it uses. the stack would always start with MPI_COMM_WORLD. You could push a communicator onto the stack before constructing a pipeline, and pop it after. I don't think this will work inside threads, each thread would have to have a thread local communicator stack. TODO -- research thread locals, how hard would it be...

taobrienlbl commented 5 years ago

I'm totally OK with proceeding with the per-algorithm version that you are suggesting. It seems the most straightforward way to proceed for now, and I agree that MPI_COMM_WORLD will be by far the most common use case.

taobrienlbl commented 5 years ago

@burlen - it looks like this needs a bit more work beyond #203

The problem that I'm running in to when trying to run some tests on Cori is that there is currently no way to run on a login node. The various teca routines directly call MPI subroutines (e.g., MPI_Initialized() and MPI_Comm_rank()). The problem is that calling MPI routines requires MPI to have been initialized in order to work: otherwise the code returns something like Attempting to use an MPI routine before initializing MPICH. The obvious solution would be to initialize MPI first, but the catch is that initializing MPI on a login node automatically kills the program.

I'd propose a two-part solution:

  1. introduction of a new communicator-like object (perhaps just a null pointer or something) called NO_MPI or something similar, and
  2. refactor the code to use an MPI interface layer: e.g., teca_mpi_init() instead of MPI_Initialized, where the refactored code has if-statements that avoid calling MPI routines if the NO_MPI communicator is passed.

What do you think? We can discuss in person shortly.

taobrienlbl commented 5 years ago

And actually, when I run on an interactive node to get around this issue (using the code modifications you indicated in #202 to set the communicator), I get a TypeError when setting the communicator:

Traceback (most recent call last):
  File "TECAMERRAARCounter.py", line 218, in <module>
    my_counter = TECAMERRAARCounter("/project/projectdirs/m1517/cascade/taobrien/teca_bayesian_ar_detector/mcmc_training/expert_count_evaluation/ar_counts_user_taobrien.nc", no_time_var = False, comm = MPI.COMM_SELF)
  File "TECAMERRAARCounter.py", line 74, in __init__
    self.mesh_data_reader.set_communicator(comm)
  File "/global/homes/t/taobrien/.conda/envs/climate-notebook/lib/python3.6/site-packages/teca/teca_py.py", line 8749, in set_communicator
    return _teca_py.teca_algorithm_set_communicator(self, comm)
TypeError: in method 'teca_algorithm_set_communicator', argument 2 of type 'MPI_Comm'

I've just restored the branch in #203 and will try to re-open the PR.

burlen commented 5 years ago

I think these issue are solved now