litebird / litebird_sim

Simulation tools for LiteBIRD
GNU General Public License v3.0
18 stars 13 forks source link

Implement multi-core threading #276

Closed ziotom78 closed 8 months ago

ziotom78 commented 8 months ago

This PR tests how to exploit Numba's multithreading capabilities to parallelize some modules in the framework. Only the most trivial computations are parallelized here, as the primary purpose of this PR is to provide the foundations for a more efficient approach to multithreading.

The modifications I have done to the code are the following:

ziotom78 commented 8 months ago

I have ran a few tests on my old 8-core Thinkpad, and results are encouraging: both the pointing generation code and the dipole module show a speed-up that is linked to the number of cores.

Basically, I ran a short script that generates pointings for 25 days and computes the dipole for each sample, and I re-ran it several times changing the number of threads Numba was allowed to use. (I also ran each configuration five times to get some statistics.) What I see is that the greater the number of threads, the smaller the time.

Here is a plot of the time spent to run a simulation, normalized to the case with just one thread:

plot

Here is the source code of the script I used:

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-

from astropy.time import Time as AstroTime
import litebird_sim as lbs
import numba
from pathlib import Path
from time import perf_counter
import sys

# benchmark function
def benchmark(fun, *args):
    "Run the function with the specified args and return a tuple (time, result)"

    # record start time
    time_start = perf_counter()
    # call the custom function
    result = fun(*args)
    # record end time
    time_end = perf_counter()
    # calculate the duration
    time_duration = time_end - time_start
    # report the duration
    return (time_duration, result)

def run_simulation(duration_s, num_of_threads):
    sim = lbs.Simulation(
        base_path=Path(__file__).parent / "report",
        imo=lbs.Imo(flatfile_location=lbs.PTEP_IMO_LOCATION),
        start_time=AstroTime("2023-01-01"),
        duration_s=10 * 86400.0,
        name="Threading test",
        random_seed=12345,
        numba_threads=num_of_threads,
    )

    det = lbs.DetectorInfo.from_imo(
        imo=sim.imo,
        url="/releases/vPTEP/satellite/LFT/L1-040/"
        "000_000_003_QA_040_T/detector_info",
    )

    sim.set_scanning_strategy(
        scanning_strategy=lbs.SpinningScanningStrategy.from_imo(
            imo=sim.imo,
            url="/releases/vPTEP/satellite/scanning_parameters",
        )
    )

    sim.set_instrument(
        lbs.InstrumentInfo.from_imo(
            imo=sim.imo,
            url="/releases/vPTEP/satellite/LFT/instrument_info",
        )
    )

    times = []
    sim.create_observations(detectors=[det])
    times.append(benchmark(lambda: sim.compute_pointings())[0])
    sim.compute_pos_and_vel()
    times.append(benchmark(lambda: sim.add_dipole())[0])

    return times

if len(sys.argv) != 2:
    print(f"Usage: {sys.argv[0]} NUM_OF_THREADS", file=sys.stderr)
    sys.exit(1)

num_of_threads = int(sys.argv[1])

# Run a short simulation to trigger Numba's compiler
run_simulation(duration_s=1.0, num_of_threads=num_of_threads)

# Re-run the simulation and benchmark the code
times = run_simulation(duration_s=25 * 86400.0, num_of_threads=num_of_threads)

# Print three quantities:
# 1. Number of threads
# 2. Time spent for pointing generation
# 3. Time spent for the dipole
print("\t".join([str(numba.get_num_threads())] + [f"{x:.3f}" for x in times]))
mreineck commented 8 months ago

Hmm, it seems that the scaling is pretty far from ideal, but probably this is the best that the workload allows, independent of parallelization. It would be extremely valuable to see how the same problem scales when MPI is used for parallelization. Would this be difficult to do?

ziotom78 commented 8 months ago

Hi Martin, this is an excellent suggestion! Unfortunately, my laptop provides a buggy MPI implementation that enters a deadlock whenever I run allreduce… I tried to run the tests on our local cluster, but I am still struggling with making mpi4py recognize the MPI libraries.

If anybody else could run a few tests to check how the performance with N Numba threads compares with the performance with N MPI tasks and one Numba thread per task, it would be awesome!

mreineck commented 8 months ago

I'm happy to give it a try! The normal, multithreaded case seems to work on my laptop, but I fear I'm not sufficiently familiar enough with the package to make the demo work with MPI. All my attempts to run the script with more than one MPI task currently end up in

Traceback (most recent call last):
  File "/home/martin/codes/litebird_sim/bench.py", line 77, in <module>
    run_simulation(duration_s=1.0, num_of_threads=num_of_threads)
  File "/home/martin/codes/litebird_sim/bench.py", line 64, in run_simulation
    sim.compute_pos_and_vel()
  File "/home/martin/codes/litebird_sim/litebird_sim/simulations.py", line 1289, in compute_pos_and_vel
    self.pos_and_vel = spacecraft_pos_and_vel(
                       ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/martin/codes/litebird_sim/litebird_sim/spacecraft.py", line 397, in spacecraft_pos_and_vel
    assert obs or (
AssertionError: You must either provide a Observation or start_time/time_span_s
paganol commented 8 months ago

Hi there, I hope this can help, I played a bit with this branch. I'm running a small script (the one you see below) on one node (one Skylake with 48 processors) of marconi. I'm keeping constant the number of cores used (= 48) and changing the share between mpi and openmp. This is what I get:

On the x axis, the number should read as (mpi tasks) - (threads) The pure mpi implementation seems a factor ~10 better than a pure openmp run (still checking if the workload distribution in the node is handled correctly though). For the pure mpi the time for pointing and scan map is roughly the same 2.8s VS 2.5s. For the pure openmp it goes up to 21.8 VS 44.5


import litebird_sim as lbs
import numpy as np
import matplotlib.pylab as plt
import healpy as hp
import astropy
from astropy.time import Time
import os 
import time

comm = lbs.MPI_COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

#threads = int(os.environ["OMP_NUM_THREADS"])

threads = int(os.environ["NUMBA_NUM_THREADS"])

if (rank == 0):
    print('MPI tasks: ',size)
    print('Threads: ',threads)

t0 = time.time()
telescope = "LFT"
channel = "L4-140"
#detlist = ["000_001_017_QB_140_T", "000_001_017_QB_140_B"]
detlist = ["000_001_017_QB_140_T"]

start_time = 0.0 #astropy.time.Time("2025-01-01T00:00:00")
mission_time_days = 60

imo_version = "vPTEP"

# Resolution of the output maps
nside = 64

# This is the folder where the final report with the results of the simulation will be saved
base_path = ".test"

# This loads the full IMO (not only the version specified by `imo_version`!)
imo = lbs.Imo()

# initializing the simulation
sim = lbs.Simulation(
    base_path=base_path,
    mpi_comm=comm,
    start_time=start_time,
    duration_s=mission_time_days * 24 * 3600.0,
    random_seed=0,
    numba_threads=threads, 
)

sim.set_instrument(
    lbs.InstrumentInfo.from_imo(
        imo,
        f"/releases/{imo_version}/satellite/{telescope}/instrument_info",
    )
)

dets = []
for n_det in detlist:
    det = lbs.DetectorInfo.from_imo(
        url=f"/releases/{imo_version}/satellite/{telescope}/{channel}/{n_det}/detector_info",
        imo=imo,
    )
    dets.append(det)

sim.set_scanning_strategy(
    imo_url=f"/releases/{imo_version}/satellite/scanning_parameters/"
)

t1 = time.time()

# creating one observation
sim.create_observations(
    detectors=dets,
    n_blocks_det=1,
    n_blocks_time=size,
    split_list_over_processes=False,
)

sim.set_hwp(
    lbs.IdealHWP(
        sim.instrument.hwp_rpm * 2 * np.pi / 60,
    ),  # applies hwp rotation angle to the polarization angle
)

t2 = time.time()

sim.compute_pointings()

maps=np.zeros((3,12*nside*nside))

t3 = time.time()

sim.fill_tods(maps)
#sim.add_dipole()
#sim.add_noise(random=sim.random)

t4 = time.time()

if (rank == 0):
   print(t1-t0,t2-t1,t3-t2,t4-t3)
ziotom78 commented 8 months ago

Thanks @paganol for the test. I think I understand what's happening here.

My modifications to the code only involved Numba functions, which however are not where 100% of the time is spent. Specifically, in your test you are basically computing a lot of pointings, which involve (1) quaternion composition and (2) calls to AstroPy. The quaternion functions have all been parallelized, but AstroPy does not rely on Numba and thus probably uses just one CPU.

On the other hand, when one runs the code using MPI, everything is parallelized, which means that there is a more significant speed boost.

As the primary objective of our task was to check how to tune the framework to be used in a hybrid mode (MPI+multithreading), particularly with the idea to be able to use beam convolution codes, we should decide what should be the next steps:

  1. Just parallelize Numba functions, knowing that this won't be a huge performance boost. (We'll get more from multiple threads once we inject beam convolution codes based on ducc, of course.)
  2. We switch to a more radical solution, e.g., by means of the multiprocess module. Thus, Simulation.compute_pointings() would become something like the following:

    def compute_pointings(self, …):
        # Wrap the whole body of `compute_pointings` within a parallel iterator
        # so that everything (Numba, AstroPy…) gets parallelized over CPU threads
        for i in parallel_iterator(…):
            # Here goes the old code
            …

Solution 1 is probably “good enough” in the shorter term, but in the longer term solution 2 would probably be the best.

ziotom78 commented 8 months ago

I explored how the multiprocessing module might be used to parallelize the code and found out that it would be quite hard to employ in our codebase. The best option would be to use the Pool object, as it is able to distribute work across processes. However, it's not trivial to integrate in our codebase, because it requires to manually split the timelines in chunks to be fed to each process, and this would require substantial rework of several parts of the code.

As discussed during the Simulation Team telecon (2023-11-30), we agreed that MPI+Numba should be good enough for all the use cases foreseen in the near future.

If there are no objections, I will merge this PR by the middle of next week.

mreineck commented 8 months ago

Thanks for investigating this! I agree that MPI+Numba is most likely the way to go for now. Sorry for not attending the telecon yesterday, I'm on vacation at the moment.

paganol commented 8 months ago

@ziotom78, should we set numba_threads to OMP_NUM_THREADS as default?

ziotom78 commented 8 months ago

@ziotom78, should we set numba_threads to OMP_NUM_THREADS as default?

Right, I forgot about it! I'll patch the code immediately.

ziotom78 commented 8 months ago

Done, with this change this is what I get from a simple script:

$ cat test.py
import litebird_sim as lbs

sim = lbs.Simulation(start_time=0.0, duration_s=10.0, random_seed=None)
print(f"{sim.numba_threads=}")

$ python3 test.py
sim.numba_threads=None

$ OMP_NUM_THREADS=4 python3 test.py
sim.numba_threads=4