quaquel / EMAworkbench

workbench for performing exploratory modeling and analysis
BSD 3-Clause "New" or "Revised" License
128 stars 90 forks source link

Introducing MPIEvaluator: Run on multi-node HPC systems using mpi4py #299

Closed EwoutH closed 1 year ago

EwoutH commented 1 year ago

This PR adds a new experiment evaluator to the EMAworkbench, the MPIEvaluator. This evaluator allows experiments to be conducted on multi-node systems, including High-Performance Computers (HPC) such as DelftBlue. Internally, it uses the MPIPoolExecutor from mpi4py.futures.

Additionally, logging has been integrated to facilitate debugging and performance tracking in distributed setups. As a robustness measure, mocked tests have been added to ensure consistent behavior and they have been incorporated into the CI pipeline. This might help catch future breaking changes in mpi4py, such as with the upcoming 4.0 release (https://github.com/mpi4py/mpi4py/issues/386).

This PR follows from the discussions in #266 and succeeds the development PR #292.

Conceptual design

1. MPIEvaluator Class

The MPIEvaluator class is at the main component of this design. Its primary role is to initiate a pool of workers across multiple nodes, evaluate experiments in parallel, and finalize resources when done.

Initialization:

Evaluation:

Finalization:

2. run_experiment_mpi Function

This helper function is designed to unpack experiment data, set up the necessary logging configurations, run the experiment on the designated MPI rank (node), and return the results. This is the worker function that runs on each of the MPI ranks.

Logging:

3. Logging Enhancements

A dedicated logger for the MPIEvaluator was introduced to provide clarity during debugging and performance tracking. Several measures were taken to ensure uniform logging verbosity across nodes and improve log readability:

PR structure

This PR is structured in five commits:

  1. MPIEvaluator for HPC Systems (0bc9e15)

    • Purpose: To extend the capabilities of the EMAworkbench to multi-node HPC systems.
    • Changes:
      • Introduced the MPIEvaluator class.
      • Added an initialization function to set up the global ExperimentRunner for worker processes.
      • Included proper handling for packing and unpacking experiments for efficient data transfer between nodes.
    • Dependencies: While the addition leverages the mpi4py library, it's necessary to note that the dependency on mpi4py is only when the MPIEvaluator is utilized, thus not imposing unnecessary packages on other users.
  2. Enhanced Logging for MPIEvaluator (59a2b7a)

    • Purpose: To provide clear and detailed logs for debugging and performance tracking in distributed environments.
    • Changes:
      • Set up a dedicated logger for the MPIEvaluator.
      • Ensured uniform logging verbosity across nodes by passing the logger's level to each worker process.
      • Introduced log messages for tracking progress on individual MPI ranks.
      • Refined log format for better readability by displaying the MPI process name alongside the log level.
  3. Integration of MPIEvaluator Tests into CI (f51c29f)

    • Purpose: To ensure the reliable functioning of the MPIEvaluator through continuous integration testing.
    • Changes:
      • Incorporated the MPIEvaluator into the test suite using mock tests simulating its interaction with mpi4py.
      • Enriched the CI pipeline (.github/workflows/ci.yml) to encompass MPI testing, specifically for Ubuntu with Python 3.10.
      • Included conditional logic to skip MPI tests when not on Linux platforms or in the absence of mpi4py.
    • Importance: With the upcoming mpi4py 4.0 release, potential breaking changes can be caught early through these mocked tests.
    • Note: It's imperative to understand that these tests focus on the MPIEvaluator logic and its interactions, and do not delve into the actual internal workings of MPI.

However, a global initializer had issues with re-initializing the MPIEvaluator pool, where the second attempt would consistently throw a 'BrokenExecutor: cannot run initializer' error. This behavior was particularly evident when invoking the MPIEvaluator consecutively in a sequence.

After reproducing the issue with simplified examples and confirming its origin, the most robust approach to address this was to eliminate the common initializer function from the MPIEvaluator. This is done in dff46cd. Since the initializer also contained the logger configuration, that part was restored in f67b194. These commits are kept separate to provide insight in the development process and design considerations of the MPIEvaluator.

  1. Refinement of MPIEvaluator Initialization and Experiment Handling (dff46cd)

    • Purpose: To streamline the initialization and experiment execution process within the MPIEvaluator.
    • Changes:
      • Removed the global ExperimentRunner and associated initializer function.
      • Adjusted the MPIEvaluator constructor to optionally accept the number of processes (n_processes).
      • Simplified the experiment packing process by including only the model name and the experiment itself.
      • Introduced an ExperimentRunner instantiation within the run_experiment_mpi function to handle experiments.
  2. Logging Configuration Enhancement (f67b194)

    • Purpose: To ensure consistent logging levels across all MPI processes.
    • Changes:
      • Modified the experiment packing process to include the effective logging level.
      • Updated the run_experiment_mpi function to configure logging based on the passed level, ensuring uniformity across all worker processes.

Technical changes per file

  1. CI Configuration (ci.yml):

    • Added an MPI testing flag to the matrix build.
    • Defined steps to set up necessary MPI libraries and the mpi4py package.
  2. EMAworkbench Initialization Files:

    • Imported and initialized the MPIEvaluator.
  3. Evaluator Logic Enhancements (evaluators.py):

    • Defined global ExperimentRunner for worker processes.
    • Added the MPIEvaluator class with its initialization, finalization, and experiment evaluation logic.
    • Implemented logic to handle experiments in an MPI environment.
  4. Logging Improvements (ema_logging.py):

    • Introduced an optional flag to adjust log levels for the root logger.
  5. Test Enhancements (test_evaluators.py):

    • Incorporated mocked tests for the MPIEvaluator, conditional on the availability of mpi4py and a Linux environment.

Logging

Logging is a big part of this PR. Being able to debug failures and errors on HPC systems effectively is important, because the iteration speed is on these systems if often low (since you have to queue jobs).

This is how the current logs work for a simple model example run:

INFO level (20)

[MainProcess/INFO] MPI pool started with 9 workers
[MainProcess/WARNING] With only a few workers (9), the MPIEvaluator may be slower than the Sequential- or MultiprocessingEvaluator
[MainProcess/INFO] performing 25 scenarios * 1 policies * 1 model(s) = 25 experiments
  0%|                                                   | 0/25 [00:00<?, ?it/s][MainProcess/INFO] MPIEvaluator: Starting 25 experiments using MPI pool with 9 workers
[MainProcess/INFO] MPIEvaluator: Completed all 25 experiments
100%|██████████████████████████████████████████| 25/25 [00:00<00:00, 39.04it/s]
[MainProcess/INFO] MPIEvaluator: Callback completed for all 25 experiments
[MainProcess/INFO] experiments finished
[MainProcess/INFO] MPI pool has been shut down
DEBUG level ``` [MainProcess/INFO] MPI pool started with 9 workers [MainProcess/WARNING] With only a few workers (9), the MPIEvaluator may be slower than the Sequential- or MultiprocessingEvaluator [MainProcess/INFO] performing 25 scenarios * 1 policies * 1 model(s) = 25 experiments 0%| | 0/25 [00:00

Performance

Overhead

boxplot

Divided by 10 cores:

Model SequentialEvaluator MultiprocessingEvaluator MPIEvaluator
python 1 0.0135 0.0101
lake 1 0.4296 0.3037
flu 1 0.4041 0.3976

Undivided:

Model SequentialEvaluator MultiprocessingEvaluator MPIEvaluator
python 1 0.135 0.101
lake 1 4.296 3.037
flu 1 4.041 3.976

Scaling

The first one used shared nodes, which shows inconsistent performance:

boxplot_lake

The second two experiments claimed full nodes exclusively. While normally this isn't best practice, it was useful to get insight in performance scaling. boxplot_lake_exclusive boxplot_flu

Both models are relatively simple with large communication overhead. Testing the performance on a more compute intensive model would be interesting for future research. The MPIEvaluator-benchmarks branch can be used for this.

Documentation

A tutorial for the MPIEvaluator will be added in PR #308.

Limitations & future enhancements

There are two main limitations currently:

  • The MPIEvaluator is not tested with file-based models, such as NetLogo and Vensim. It might still work, but it's not tested. Originally this was in scope for this PR, but due to difficulties in creating the proper environment, this was cut out of the scope of this efford.
  • The model object is currently passed to the worker for each experiment. For large models with a relatively short runtime, this introduces significant performance overhead. Therefore, and optimization could be made to send the model only once to a worker on initialization.
    • Building on this, submitting experiment parameter sets in batches could also help increate performance, instead of sending them to the workers one-by-one.

Some other future improvements could be:

  • A new Callback class could be implemented that streams to the disk instead of keeping all results in memory. This would allow for handling very large model that gather lots of data, probably at the costs of some performance. See issue #304 for more details.
  • Further performance profiling could be done on the current design, to see any components can be sped up, like the distribution of experiments and models to workers, the logging or the.
    • It would be interesting to see how larger models perform on many-node systems, and if the scaling is better than with the small lake and flu models.

Review

This PR is ready for review.

When merging, the preferred method would be fast-forward merge to keep individual commit messages and be able to revert one if necessary.

coveralls commented 1 year ago

Coverage Status

coverage: 80.585% (-0.07%) from 80.655% when pulling 65e0fc0abd23de215144cb56d5b954aaca0b0f3f on MPIEvaluator into c9049bbeac814498dab378e5b9092c2ef725d715 on master.

EwoutH commented 1 year ago

Thanks for initial review! I updated the loglevel in (60f1f9c) and added example logs (on INFO and DEBUG level) to the PR.

Tomorrow I will be benchmarking and working on the documentation. If you have any specific requests (for models or scenarios to tests, or specific documentation to write), please let me know!

quaquel commented 1 year ago

It would be great to have a simple lake model example that can be run on an hpc. I guess this would need to take the form of a notebook because It needs to cover the python code and the batch script.

Also, not essential for this PR, but for my understanding of what functionality is available, have you run any tests with a FileModel or any of its subclasses (e.g., NetLogoModel).

EwoutH commented 1 year ago

It's good that I decided to do extensive performance testing, because I ran into a nasty bug that I likely wouldn't have noticed otherwise.

Somehow every time the second initialization of a MPIEvaluator broke down, with errors like:

"/mnt/c/Users/Ewout/Documents/GitHub/EMAworkbench/ema_workbench/em_framework/evaluators.py", line 450, in initialize
    if self._pool._max_workers <= 10:
TypeError: '<=' not supported between instances of 'NoneType' and 'int'
--------------------------------------------------------------------------
MPI_ABORT was invoked on rank 0 in communicator MPI_COMM_WORLD
with errorcode 1.

Which indicated no workers available. My first hypotheses was that the MPI pool wasn't properly shut down, so I tried about 138 different ways to shut it down.

Apparently, that wasn't the problem.

So I decided to break it down to it's simplest form, try to get it working, and then bisect between our then current, broken implementation and the working one.

The minimal working example I initially found was this:

import time
from mpi4py.futures import MPIPoolExecutor

def simple_task(x):
    time.sleep(0.2)
    return x * x

def main():
    # First use of the MPI Pool
    with MPIPoolExecutor() as pool:
        results = list(pool.map(simple_task, range(10)))
        print("First pool completed")

    # Explicitly try to shut it down (though it should be shut down by the context manager)
    pool.shutdown(wait=True)

    # Second use of the MPI Pool
    with MPIPoolExecutor() as pool:
        results = list(pool.map(simple_task, range(20)))
        print("Second pool completed")

if __name__ == "__main__":
    main()

After 6 iterations I settled on the maximum working model, having a MPIEvaluator class:

import time
from mpi4py.futures import MPIPoolExecutor

def simple_task(x):
    time.sleep(0.2)
    return x * x

class MPIEvaluator:
    def __init__(self, n_workers=None):
        self._pool = None
        self.n_workers = n_workers

    def __enter__(self):
        self._pool = MPIPoolExecutor(max_workers=self.n_workers)
        print(f"MPI pool started with {self._pool._max_workers} workers")
        return self._pool

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self._pool:
            self._pool.shutdown(wait=True)
            print("MPI pool has been shut down")
            self._pool = None

def main():
    # First use of the MPIEvaluator
    with MPIEvaluator(n_workers=4) as pool:
        results1 = list(pool.map(simple_task, range(10)))
        print("First pool completed")

    # Second use of the MPIEvaluator
    with MPIEvaluator(n_workers=4) as pool:
        results2 = list(pool.map(simple_task, range(20)))
        print("Second pool completed")

if __name__ == "__main__":
    main()

And the minimum breaking one:

import time
from mpi4py.futures import MPIPoolExecutor

def simple_task(x):
    time.sleep(0.2)
    return x * x

def common_initializer():
    # A basic initializer, doing almost nothing for now.
    pass

class MPIEvaluator:
    def __init__(self, n_workers=None):
        self._pool = None
        self.n_workers = n_workers

    def __enter__(self):
        self._pool = MPIPoolExecutor(max_workers=self.n_workers, initializer=common_initializer)
        print(f"MPI pool started with {self._pool._max_workers} workers")
        return self._pool

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self._pool:
            self._pool.shutdown(wait=True)
            print("MPI pool has been shut down")
            self._pool = None

def main():
    # First use of the MPIEvaluator
    with MPIEvaluator(n_workers=4) as pool:
        results1 = list(pool.map(simple_task, range(10)))
        print("First pool completed")

    # Second use of the MPIEvaluator
    with MPIEvaluator(n_workers=4) as pool:
        results2 = list(pool.map(simple_task, range(20)))
        print("Second pool completed")

if __name__ == "__main__":
    main()

The problem seemed to be the common, global initializer. So I refactored the MPIEvaluator in the workbench to don't need one, and that solved it instantly.

With this effort, the logger configuration has changed and now shows slightly different behaviour. I have to determine if that's desired behaviour of not.

The mocked tests still pass, so that's great I guess.

TL;DR: You couldn't use the MPIEvaluator multiple times successively in a single script. Now you can. And I know what a Singleton Pattern is. It was a fun day.

EwoutH commented 1 year ago

In e24cd08 I fixed multiple sequential calls (and probably properly shutting off in general), but broke logging. Logging was difficult, but not being allowed a global initializer made it even more challenging. In the end I found a somewhat elegant solution, implemented in e2ff35d.

I also added some performance scaling tests. Those required multiple runs in a machine-scalable fashion, so it made setting up a environment in DelftBlue also mode robust in the process (and improved my bash skills).

Performance graphs are in the main post. As expected, more complex models (flu) scale better than less complex/faster ones (lake model). All tested models have diminishing returns at some point.

All the code, figures and data for this is available on the MPIEvaluator-benchmarks branch.

EwoutH commented 1 year ago

Restructured the code, and can be reviewed. I decided to leave in 0783435 and 2eb434e as distinct commits, instead of squashing them, because it shows design considerations and restrictions made in the process.

Docs will follow in separate PR, because this one is large enough as is.

EwoutH commented 1 year ago

A tutorial for the MPIEvaluator will be added in #308.

EwoutH commented 1 year ago

@quaquel I know you're quite busy, but would you have time to review this sooner than later? Now the code and all the ideas behind it is still relatively fresh in my head, so I can make changes quickly without much overhead.

EwoutH commented 1 year ago

Would

UserWarning: "The MPIEvaluator is experimental and may change without notice."

suffice?

quaquel commented 1 year ago

yes, in combination with my feedback on the tutorial in #308

EwoutH commented 1 year ago

Merged! It was quite a journey, happy that it's in. @quaquel, thanks for all the help along the way!

The SEN1211 students will be developing quite heavy (geospatial) ABM models, primarily in Mesa. Since it's pure Python it should work with this implementation, it could be an interesting test case for the MPIEvaluator!

I made a new discussion for feedback and future development: https://github.com/quaquel/EMAworkbench/discussions/311.