adrn / schwimmbad

A common interface to processing pools.
MIT License
115 stars 18 forks source link

Large speed differences between MPIPool and MultiPool #39

Closed enajx closed 3 years ago

enajx commented 3 years ago

Hi,

I'm testing the performance of both pools with the given demo script:

def worker(task):
    a, b = task
    return a**2 + b**2

def main(pool):
    # Here we generate some fake data
    import random
    n = 1000000
    a = [random.random() for _ in range(n)]
    b = [random.random() for _ in range(n)]

    tasks = list(zip(a, b))
    tic = time.time()
    results = pool.map(worker, tasks)
    toc = time.time()
    print(f'Elapsed time: {toc-tic}')
    pool.close()

    print(results[:8])

if __name__ == "__main__":
    import schwimmbad
    import time

    from argparse import ArgumentParser
    parser = ArgumentParser(description="Schwimmbad example.")

    group = parser.add_mutually_exclusive_group()
    group.add_argument("--ncores", dest="n_cores", default=32,
                       type=int, help="Number of processes (uses multiprocessing).")
    group.add_argument("--mpi", dest="mpi", default=False,
                       action="store_true", help="Run with MPI.")
    args = parser.parse_args()

    pool = schwimmbad.choose_pool(mpi=args.mpi, processes=args.n_cores)

    main(pool)

These are the result running on a single linux machine with 32 cores with python script-demo --ncores 32 and mpiexec -n 32 python script-demo.py --mpi respectively:

n = 100000 MultiPool : 0.03s MPIPool : 0.58s

n = 1000000 MultiPool : 0.22s MPIPool : 6.65s

n = 10000000 MultiPool : 2.37s MPIPool : 68.76s

I've also run it on OSX with similar resulting gaps.

I understand that MPI may introduce some extra overhead, but are these large differences to be expected?

adrn commented 3 years ago

Hi 👋 ! Thanks for the issue. I'm offline for a few days but will take a look ASAP. Thanks for the patience!

adrn commented 3 years ago

@enajx I ended up with some time tonight, so I did some tests 😅 .

First off, I can reproduce what you see on macOS. I can think of many some reasons why this might be happening, but I don't know exactly why. That said, I did some experiments, and have some recommendations.

Experiment: slow down the worker function

My first thought is that your worker function is very fast to execute, so it's possible that you are dominated by MPI overheads -- either in message passing the large number of tasks, or in the pool waiting for workers to finish before sending new tasks. So, the first experiment I tried was to slow down your worker function by adding a time.sleep(1e-4) just under the function definition. That made the execution times much closer:

MPI

$ mpiexec -n 4 python demo.py --mpi
Elapsed time n=100: 0.0066378116607666016
Elapsed time n=1000: 0.054388999938964844
Elapsed time n=10000: 0.5381379127502441

Multiprocessing (MPI with -n 4 only uses 3 workers, so it's fairer to compare to 3 cores):

$ python demo.py --ncores=3
Elapsed time n=100: 0.48464012145996094
Elapsed time n=1000: 0.04478788375854492
Elapsed time n=10000: 0.4312679767608642

Batching tasks before map'ing

If your worker function is inherently very fast to execute and you just have a ton of tasks to execute on, I have gotten much better performance by first batching up the tasks and sending batches of tasks to the worker function.

Example script here (a modified version of your demo): def worker(batch): _, tasks = batch results = [] for a, b in tasks: results.append(a**2 + b**2) return results def main(pool, n): from schwimmbad.utils import batch_tasks # Here we generate some fake data import random a = [random.random() for _ in range(n)] b = [random.random() for _ in range(n)] tasks = list(zip(a, b)) batches = batch_tasks(min(1, pool.size-1), arr=tasks) tic = time.time() results = pool.map(worker, batches) toc = time.time() results = [x for sublist in results for x in sublist] print(f'Elapsed time n={n}: {toc-tic}') if __name__ == "__main__": import schwimmbad import time from argparse import ArgumentParser parser = ArgumentParser(description="Schwimmbad example.") group = parser.add_mutually_exclusive_group() group.add_argument("--ncores", dest="n_cores", default=32, type=int, help="Number of processes (uses multiprocessing).") group.add_argument("--mpi", dest="mpi", default=False, action="store_true", help="Run with MPI.") args = parser.parse_args() pool = schwimmbad.choose_pool(mpi=args.mpi, processes=args.n_cores) for n in [10 ** x for x in range(2, 6+1)]: main(pool, n)

MPI:

$ mpiexec -n 4 python schw-test.py --mpi
Elapsed time n=100: 0.001007080078125
Elapsed time n=1000: 0.003452777862548828
Elapsed time n=10000: 0.03164792060852051
Elapsed time n=100000: 0.29506802558898926
Elapsed time n=1000000: 2.743013858795166

Multiprocessing:

$ python schw-test.py --ncores=3
Elapsed time n=100: 0.4871842861175537
Elapsed time n=1000: 0.001354217529296875
Elapsed time n=10000: 0.006591081619262695
Elapsed time n=100000: 0.07938289642333984
Elapsed time n=1000000: 0.699350118637085

So, MPI is still slower here, but closer than in your initial example (though you could probably get slightly better performance by tuning the number of batches).

enajx commented 3 years ago

That makes sense!

I run a benchmark again with a more realistic worker function and now I get more consistent results, in this case MPI taking the lead:

Computing pi, running on OSX with 8 cores:

n = 1000000 MultiPool : 0.54s MPIPool : 0.12s

n = 10000000 MultiPool : 1.45s MPIPool : 1.22s


import math
import random
import time
import schwimmbad

def sample(num_samples):
    num_inside = 0
    for _ in range(num_samples):
        x, y = random.uniform(-1, 1), random.uniform(-1, 1)
        if math.hypot(x, y) <= 1:
            num_inside += 1
    return num_inside

def approximate_pi_parallel(num_samples, cores, mpi):

    sample_batch_size = 1000

    with schwimmbad.choose_pool(mpi=mpi, processes=cores) as pool: 
        print(pool)
        start = time.time()
        num_inside = 0
        sample_batch_size = sample_batch_size
        for result in pool.map(sample, [sample_batch_size for _ in range(num_samples//sample_batch_size)]):
            num_inside += result
        print(f"pi ~= {(4*num_inside)/num_samples}")
        print(f"Finished in: {time.time()-start}s")
        pool.close()    

if __name__ == "__main__":

    n = 1000000
    # n = 10000000
    cores = 8
    mpi = False # True when run with mpiexec
    approximate_pi_parallel(n, cores, mpi)

Issue solved, thank you!