mehta-lab / recOrder

3D quantitative label-free imaging with phase and polarization
BSD 3-Clause "New" or "Revised" License
11 stars 4 forks source link

Metadata race conditions prevent multi-position parallelization #444

Open talonchandler opened 9 months ago

talonchandler commented 9 months ago

Typical recOrder reconstructions use globs to select multiple positions like this:

recorder apply-inv-tf -i PAL027_DENV_A549_raw.zarr/*/*/* -t phase_tf.zarr -c ./phase.yaml -o test_output.zarr

which multiprocesses over positions. This works well, but it requires a lot of memory on a single node.

One option is to send indivdual positions to different jobs/processes/nodes with:

recorder apply-inv-tf -i PAL027_DENV_A549_raw.zarr/A/1/1/ -t phase_tf.zarr -c ./phase.yaml -o test_output.zarr & 
recorder apply-inv-tf -i PAL027_DENV_A549_raw.zarr/B/2/2/ -t phase_tf.zarr -c ./phase.yaml -o test_output.zarr &

but this causes a metadata race condition. Both processes will create a new metadata file because one does not exist, and the second job to complete the metadata creation will "win". For example, if the first process finishes last, iohub info will show

Row names:       ['A']
Column names:        ['1']

@ziw-liu and I discussed, and we're strongly considering changing recorder compute-tf to recorder prepare. Instead of just computing the transfer function, recorder prepare will take a list of positions and a list of configs (one for each channel) and it will compute the transfer functions and prepare the output zarr store, so that the parallelizable recorder apply-inv-tf can fill it.

This means that create_empty_zarr (or a near variant) can be called once in advance, instead of in each process which creates the race conditions.

ziw-liu commented 9 months ago

A workaround for now is to use a different output path for each process (use $input_fov_name as output path in the Slurm script), and aggregate with a small python script that calls iohub.ngff.Plate.from_positions().

edyoshikun commented 8 months ago

In my workflow, I use the slurmkit scripts that first create_empty_zarr(), compute the transfer function, and then using the input filepaths, I send one position to one node to do the apply_inv_affine. I haven't encountered any issues doing this. The advantage of the slurmkit is that it's all Python and there is no need to create additional slurm scripts if you have dependencies.

I wouldn't merge both functions into one. I think the compute-tf has value on its own, and I would leave it separately. what you can do is similar to reconstruct where you call the compute and then the apply_inv, have a prepare, function that calls the create_empty_hcs_zarr and the compute_tf.

Some thoughts on the parellization:

This is the script I use using recOrder 0.4.0

import datetime
import os
import glob
from slurmkit import SlurmParams, slurm_function, submit_function
from natsort import natsorted
import click
from iohub import open_ome_zarr
from recOrder.cli.settings import ReconstructionSettings
import torch
from recOrder.io import utils
from recOrder.cli.utils import (
    apply_inverse_to_zyx_and_save,
    create_empty_hcs_zarr,
)
from recOrder.cli.compute_transfer_function import (
    compute_transfer_function_cli,
)

from recOrder.cli.apply_inverse_transfer_function import (
    get_reconstruction_output_metadata,
    apply_inverse_transfer_function_single_position,
)
from pathlib import Path

# Reconstruction parameters
config_path = "./phase.yml"
transfer_function_path = "./TF_phase3D.zarr"

# io parameters
input_position_dirpaths = "/hpc/projects/comp.micro/mantis/2023_08_09_HEK_PCNA_H2B/00-zarr_iohub_010dev4/pcna_timelapse_1/pcna_timelapse_labelfree_1.zarr/0/0/0"
output_dirpath = "./timelapse_reg_1e-5.zarr"
# sbatch and resource parameters
cpus_per_task = 3
mem_per_cpu = "8G"
time = 40  # minutes

# Path handling
input_position_dirpaths = [
    Path(path) for path in natsorted(glob.glob(input_position_dirpaths))
]
output_dirpath = Path(output_dirpath)
slurm_out_path = output_dirpath.parent / "slurm_output/recon-%j.out"

transfer_function_path = Path(transfer_function_path)
config_path = Path(config_path)

## First compute-tf
# Handle transfer function path
# Compute transfer function
compute_transfer_function_cli(
    input_position_dirpath=input_position_dirpaths[0],
    config_filepath=config_path,
    output_dirpath=transfer_function_path,
)

## Second apply-inv-tf
output_metadata = get_reconstruction_output_metadata(
    input_position_dirpaths[0], config_path
)

create_empty_hcs_zarr(
    store_path=output_dirpath,
    position_keys=[p.parts[-3:] for p in input_position_dirpaths],
    **output_metadata,
)

click.echo(f"in: {input_position_dirpaths}, out: {output_dirpath}")

# prepare slurm parameters
params = SlurmParams(
    partition="preempted",
    cpus_per_task=cpus_per_task,
    mem_per_cpu=mem_per_cpu,
    time=datetime.timedelta(minutes=time),
    output=slurm_out_path,
)

# wrap our process_single_position() function with slurmkit
slurm_reconstruct_single_position = slurm_function(
    apply_inverse_transfer_function_single_position
)
reconstruct_func = slurm_reconstruct_single_position(
    transfer_function_dirpath=transfer_function_path,
    config_filepath=config_path,
    num_processes=cpus_per_task,
)

# generate an array of jobs by passing the in_path and out_path to slurm wrapped function
recon_jobs = [
    submit_function(
        reconstruct_func,
        slurm_params=params,
        input_position_dirpath=input_position_dirpath,
        output_position_dirpath=output_dirpath
        / Path(*input_position_dirpath.parts[-3:]),
    )
    for input_position_dirpath in input_position_dirpaths
]