nel-lab / mesmerize-core

High level pandas-based API for batch analysis of Calcium Imaging data using CaImAn
Other
60 stars 15 forks source link

Add async_local backend and allow using an existing dview for local and async_local backends #311

Open ethanbb opened 2 months ago

ethanbb commented 2 months ago

This is a feature I added for myself, and I think it may be useful for others so I'm offering it here.

Problem: mesmerize is currently inflexible with regard to how the parallel processing is set up. Running an item always opens a new multiprocessing pool; you can control the number of processes through the MESMERIZE_N_PROCESSES environment variable, but that's it. I wanted to have the ability to a) pass in an existing cluster to multiple runs, to save overhead, and/or b) use a different type of cluster (e.g. an ipyparallel cluster spanning multiple nodes, or even a load-balanced view).

Solution: Passing a pool or dview into the run function clearly won't work with a subprocess, so the subprocess and slurm backends are out. The local backend calls the function directly, but it has the disadvantage that it blocks, so only one gridsearch run can be done at a time. However, we can get around that by spawning a thread (again, not a subprocess; the cluster objects can't be pickled).

I added a new backend called "local_async," which just launches a local run in a thread using the concurrent.futures module from the standard library. I also made it possible to pass a dview into both the local and local_async backends. I factored out some boilerplate from the 3 algorithm files into a _utils.py file that launches a new multiprocessing pool if no dview was passed in (and closes it when finished), and otherwise just forwards what was passed in.

Finally, I added a test that compares results from the 3 non-SLURM backends.

The diff is not really as big as Github is saying; there are a lot of whitespace changes, since I added a context manager in the 3 algorithms files, but checking the box to hide whitespace changes shows something more reasonable.

kushalkolar commented 2 months ago

Thanks! I haven't used the concurrent module in any of my own code so this will take me a while to review.

ethanbb commented 2 months ago

Sure, no worries! I also haven't used that module before today. Thus I'm not 100% sure there won't be issues with it, but so far it seems pretty straightforward.

If it helps, I used this toy script to help convince myself that it's doing the right thing:

from concurrent.futures import ThreadPoolExecutor
from typing import Iterable
from multiprocessing.pool import Pool
import time
from caiman.cluster import setup_cluster, stop_server

def calc_square(x: int) -> int:
    return x ** 2

def calc_squares(xs: Iterable[int], pool: Pool):
    time.sleep(10)
    return pool.map_async(calc_square, xs).get()

if __name__ == '__main__':
    _, pool, n_procs = setup_cluster(backend="multiprocessing", n_processes=4)

    t0 = time.time()
    with ThreadPoolExecutor(max_workers=2) as executor:
        future0 = executor.submit(calc_squares, range(2), pool)
        future1 = executor.submit(calc_squares, range(2, 4), pool)

    print(future0.result())
    print(future1.result())

    t1 = time.time()
    print(f'Executed in {t1-t0:.2f} seconds')

    stop_server(dview=pool)
kushalkolar commented 2 months ago

This makes me think whether we can use it to easily run blocking calculations in the background and visualizing the results in real-time as they come in with fastplotlib :thinking: