spacetelescope / stcal

https://stcal.readthedocs.io/en/latest/
Other
10 stars 32 forks source link

jump step fails within multiprocessing on newer Macbooks #247

Closed thomaswilliamsastro closed 6 months ago

thomaswilliamsastro commented 7 months ago

I'm having an issue with running the jump step with multiprocessing on my Macbook Pro (M1 chip). This doesn't seem to be an issue on a linux machine, so I suspect this is a Mac-only issue.

The problem seems to be in opencv trying to do its own multiprocessing under the hood, which doesn't work if you're already using MP. Minimally reproducible example that crashes:

import gc
import glob
import multiprocessing as mp
import os
from functools import partial

import numpy as np
from jwst.pipeline import calwebb_detector1

def run_lv1(dither):
    uncal_files = glob.glob(f"{dither}*_uncal.fits")
    uncal_files.sort()

    for uncal_file in uncal_files:
        config = calwebb_detector1.Detector1Pipeline.get_config_from_reference(
            uncal_file
        )
        detector1 = calwebb_detector1.Detector1Pipeline.from_config_section(config)

        # Run the level 1 pipeline
        detector1.run(uncal_file)

input_files = glob.glob("/path/to/uncals/*_uncal.fits")
procs = 4

dithers = []
for file in input_files:
    file_split = os.path.split(file)[-1].split("_")
    dithers.append("_".join(file_split[:2]) + "_*_" + file_split[-2])
dithers = np.unique(dithers)
dithers.sort()

# This one works
for dither in dithers:
    run_lv1(dither)

# This one crashes
with mp.get_context("fork").Pool(procs) as pool:
    successes = []

    for success in pool.imap_unordered(
            partial(
                run_lv1,
            ),
            dithers,
    ):
        successes.append(success)

    pool.close()
    pool.join()
    gc.collect()

The for loop works fine, the mp.pool() crashes. This is fixed by adding

cv.setNumThreads(1)

at the top of jump.py. So I suspect some kind of test to catch if we're in some kind of MP and if so setting the cv NumThreads to 1 is a solution here

seanlinden commented 7 months ago

Same issue for me on Mac M2 Max chip

emolter commented 6 months ago

@seanlinden @thomaswilliamsastro Thanks for reporting this. We have made several changes to the way multiprocessing works in the pipeline since this issue was submitted, including some bug fixes that should have solved this. These were released with pipeline version 1.14.0 on 29 March. We have added a documentation page here (thanks to @penaguerrero) that gives best-practice recommendations for using multiprocessing. That page includes an example that is very similar to what you are trying to do (example 2). I just verified that the documented example works on my local machine to process multiple uncal files at the same time, running MacOS Ventura 13.6.3 with Apple M2 Max chips on Python 3.11 with jwst==1.14.0.

While we recommend following the new docs, this modified version of your script also works fine on my machine:

import gc
import glob
import multiprocessing as mp
from jwst.pipeline import calwebb_detector1

def run_lv1(uncal_file):

    config = calwebb_detector1.Detector1Pipeline.get_config_from_reference(
        uncal_file
    )
    detector1 = calwebb_detector1.Detector1Pipeline.from_config_section(config)

    # Run the level 1 pipeline
    detector1.run(uncal_file)

if __name__ == "__main__":

    input_files = glob.glob("*uncal.fits")
    procs = 6

    # This one no longer crashes
    with mp.get_context("fork").Pool(procs) as pool:
        successes = []

        for success in pool.imap_unordered(
                    run_lv1,
                input_files,
        ):
            successes.append(success)

        pool.close()
        pool.join()
        gc.collect()

Please try using the newly documented example and the latest pipeline version.

braingram commented 6 months ago

Thanks @emolter !

I'm going to close this issue as the above changes and suggested new script appears to address the issue. Please feel free to re-open the issue if the changes do not solve the issue (or if there are other problems).