ipython / ipyparallel

IPython Parallel: Interactive Parallel Computing in Python
https://ipyparallel.readthedocs.io/
Other
2.59k stars 1.01k forks source link

Mimick hybrid parallelization approach: `mpi4py` processes with multithreading #906

Open skwde opened 3 weeks ago

skwde commented 3 weeks ago

I am wondering if it is possible to "mimick" a hybrid parallelization approach with mpi4py using ipyparallel? I.e. have each engine access several cores?

By hybrid I mean using mpi4py for communication between MPI processes and Multithreading (e.g. using numba / BLAS threads in numpy / OpenMP) in ipyparallel. Right now, every engine only sees a single core.

import ipyparallel as ipp 
cluster = ipp.Cluster(engines="MPI", n=4, cluster_id="ppb", profile="MPI")
clients = cluster.start_and_connect_sync()
%%px
import numba
print(f"Numba can use {numba.config.NUMBA_NUM_THREADS} thread(s)")

Every engine then prints

Numba can use 1 thread(s)

While locally

import numba
print(f"Numba can use {numba.config.NUMBA_NUM_THREADS} thread(s)")

in my "8 core" container I get

Numba can use 8 thread(s)

Numba basically derives the number of threads from the available CPU cores.

Is there a way (that I missed) to make the engines see more than a single core?

skwde commented 3 weeks ago

Alright, I can change the number of threads when the environment variable NUMBA_NUM_THREADS is set before importing numba, e.g. via

import os
os.environ["NUMBA_NUM_THREADS"] = '8'
import numba

Then each engine supposedly starts 8 threads.

But still, each engine uses a single thread only.

Weirdly, when I do

os.cpu_count()

each engine reports 8 cores.

It however seems that this is related to numba / mpi4py. See https://numba.discourse.group/t/limiting-number-of-numba-threads-in-mpi-hybrid-program/2517, because I get the same behavior without ipyparallel when I use a .py file.

Thus I close this again.

minrk commented 3 weeks ago

Yeah, 100%. ipyparallel doesn't try to do anything fancy, it is literally 'just' execute the Python code you gave it in the processes you launched. So if you send multi-threaded code to the engines, it makes sense to have fewer engines than cores. Dask distributed has a more sophisticated understanding of machine resources and is more often run with one multi-threaded "worker" per machine. If you are searching for tips/troubleshooting you'll probably get the best results if you leave ipyparallel out of the search, since it shouldn't affect what you need to do (usually it amounts to setting the right environment variable(s) with THREADS in the name).

Usually this means either one engine per core and write single-threaded tasks (including setting OMP_THREADS=1, numba threads, mkl threads, etc.), or one engine per machine with multi-threaded tasks (can be Python threads, openmp, numba, blas, etc.), but you can do anything in between if you want. IPython won't help or hinder you in this regard, it just runs the code you give it.

skwde commented 3 weeks ago

@minrk thanks once more for clarification.

Ok, I basically asked the same question in the numba repo for a standard python script using numba and mpi4py.

Turns out that the problem was simply a missing mpiexec option to tell MPI that it is allowed to use more resources for a single MPI process, i.e.

NUMBA_NUM_THREADS=4 
mpiexec -n 2 --map-by slot:pe=${NUMBA_NUM_THREADS} python test.py

@minrk how would I give the option --map-by slot:pe=${NUMBA_NUM_THREADS} over to the ipyparallel cluster?

minrk commented 2 weeks ago

Most if not all MPI options are configurable via environment variables, so you can set those in the parent env or in the env argument (I'm not sure what it is for map-by).

Alternatively, you can set MPILauncher.mpi_args in the configuration you pass to your launcher:

from traitlets.config import Config

config = Config()
config.MPILauncher.mpi_args = ["--map-by", "slot:pe=4"]
cluster = Cluster(config=config, n=2)

or if you're in a notebook, you can get the default config and start adding to it:

config = get_ipython().config
config.MPILauncher.mpi_args = ["--map-by", "slot:pe=4"]
cluster = Cluster(n=2)