open2c / open2c_examples

16 stars 9 forks source link

nested multiprocessing in notebooks with mpire #29

Open sergpolly opened 10 months ago

sergpolly commented 10 months ago

Consider demonstrating an example of parallel execution of some of the cooltools API functions for multiple samples - i.e. when an API function itself is using multiprocessing and we want to do it in the in the notebook ...

If one have a big multicore system (16 real cores and more) it is easy to run several CLI tasks in parallel for multiple samples, where each task itself is using several cores - i.e. is multiprocessed. Very often such multiprocessed operations does not scale well beyond 8-12 processes - so it is indeed more "economical" to process multiple samples at once with fewer cores each.

Now - what if we want to achieve the same but in the notebook ? It is not trivial to do so - because multiprocess does not allow nesting (the way we typically use it/out of the box). Now it can be easily done with MPIRE https://github.com/sybrenjansen/mpire , which allows running multiple multiprocessed task in parallel and its API is very similar to multiprocess itself ... Check it out:

mpire test:

from mpire import WorkerPool
clrs  # dictionary of several coolers
exp_kwargs = dict(view_df=hg38_arms, nproc=12)

def _job(sample):
        _clr = clrs[sample]
        _exp = cooltools.expected_cis( _clr, **exp_kwargs)
    return (sample, _exp)

# have to use daemon=False, because _job is multiprocessing-based already ...
# trying to run 8 samples in parallel, each using 12-processes - 8*12=96
with WorkerPool(n_jobs=8, daemon=False) as wpool:
    results = wpool.map(_job, telo_clrs, progress_bar=True)

# sort out the results ...
exps = {sample: _exp for sample, _exp in results}

# this takes ~1 min for 16 coolers @ 25kb on 56-core system (112 thread)

one-by-one using a ton of cores per task:

exp_kwargs = dict(view_df=hg38_arms, nproc=112)
exps = {}
for sample, _clr in clrs.items():
    print(f"calculating expected for {sample} ...")
        exps[sample] = cooltools.expected_cis( _clr, **exp_kwargs)

# this takes > 2 mins, and shows no time improvements after nproc=32 ...

this has limited application to projects with many samples and people with big workstations - but when those 2 criteria are both met - the speed up is very appreciated

sergpolly commented 10 months ago

Had issues myself using method="fork" on the outtermost WorkerPool - it was crashing shamelessly ... Here is an example on how to use method="spawn" or forkserver (it is also important to add use_dill=True as well , otherwise pickle is start failing ...)

def _job(data_pack, sample):
    # unpack shared data ...
    clr_dict, view_df = data_pack
    # define cooler to work on ...
    _clr = clr_dict[sample]
    from cooltools.sandbox.obs_over_exp_cooler import expected_full
    # calculate full expected (cis + trans)
    _exp_full = expected_full(
            _clr,
            view_df=view_df,
            smooth_cis=False,
            aggregate_trans=True,
            expected_column_name="expected",
            nproc=8,
    )
    return (sample, _exp_full)

# have to use daemon=False, because _job is multiprocessing-based already ...
with WorkerPool(
    n_jobs=16,
    shared_objects=(clrs_samples_dict, hg38_arms),
    daemon=False,
    start_method='forkserver', # or spawn ...
    use_dill=True
) as wpool:
    results = wpool.map(
        _job,
        list(clrs_samples_dict.keys()),
        progress_bar=True
    )

# sort out the results ...
exps_full_dict = {sample: _exp for sample, _exp in results}

key here , is that spawn isn't "as aware" of all the global vars as fork, so one has to pass vars explicitly and do imports inside each task ...

sergpolly commented 10 months ago

since nested multiprocess is experimental - better to keep track of related issues and such on the mpire side https://github.com/sybrenjansen/mpire/issues/105