nipreps / niworkflows

Common workflows for MRI (anatomical, functional, diffusion, etc)
https://www.nipreps.org/niworkflows
Apache License 2.0
87 stars 52 forks source link

Handle file read/write race in parallel computing #856

Open psychelzh opened 7 months ago

psychelzh commented 7 months ago

What happened?

Originally posted in https://github.com/PennLINC/xcp_d/issues/1064#issuecomment-1966634300

When different nodes try to access the same file simultaneously (especially when run pipelines in parallel), the file could be inaccessible (for XCP-D use case).

What command did you use?

For XCP-D, AFAIK, the command used belongs to `DerivativesDataSink` class.

What version of the software are you running?

XCP-D 0.6.1

How are you running this software?

Singularity

Is your data BIDS valid?

Yes

Are you reusing any previously computed results?

No

Please copy and paste any relevant log output.

Node: xcpd_wf.single_subject_TJNU007N_wf.cifti_postprocess_0_wf.qc_report_wf.ds_qc_metadata
Working directory: /seastor/CAMP/tmp/xcpd_wf/single_subject_TJNU007N_wf/cifti_postprocess_0_wf/qc_report_wf/ds_qc_metadata

Node inputs:

acquisition = <undefined>
atlas = <undefined>
base_directory = /seastor/CAMP/derivatives/xcpd_no_gsr
ceagent = <undefined>
check_hdr = True
chunk = <undefined>
cohort = <undefined>
compress = <undefined>
data_dtype = <undefined>
datatype = <undefined>
den = <undefined>
desc = linc
direction = <undefined>
dismiss_entities = ['suffix', 'task', 'measure', 'mode', 'roi', 'atlas', 'tracksys', 'ceagent', 'modality', 'space', 'flip', 'fmap', 'staining', 'tracer', 'chunk', 'mt', 'den', 'hemi', 'inv', 'recording', 'res', 'cohort', 'sample', 'reconstruction', 'desc', 'session', 'part', 'echo', 'acquisition', 'from', 'datatype', 'direction', 'model', 'extension', 'scans', 'proc', 'subject', 'label', 'subset', 'to', 'run']
echo = <undefined>
extension = .json
flip = <undefined>
fmap = <undefined>
from = <undefined>
hemi = <undefined>
in_file = ['/seastor/CAMP/tmp/xcpd_wf/single_subject_TJNU007N_wf/cifti_postprocess_0_wf/qc_report_wf/qc_report/filtered_denoisedqc_bold.json']
inv = <undefined>
label = <undefined>
measure = <undefined>
meta_dict = <undefined>
modality = <undefined>
mode = <undefined>
model = <undefined>
mt = <undefined>
part = <undefined>
proc = <undefined>
reconstruction = <undefined>
recording = <undefined>
res = <undefined>
roi = <undefined>
run = <undefined>
sample = <undefined>
scans = <undefined>
session = <undefined>
source_file = ['/seastor/CAMP/derivatives/fmriprep/sub-TJNU007N/ses-1/func/sub-TJNU007N_ses-1_task-am_dir-PA_run-1_space-fsLR_den-91k_bold.dtseries.nii']
space = <undefined>
staining = <undefined>
subject = <undefined>
subset = <undefined>
suffix = qc
task = <undefined>
to = <undefined>
tracer = <undefined>
tracksys = <undefined>

Traceback (most recent call last):
  File "/usr/local/miniconda/lib/python3.10/site-packages/nipype/pipeline/plugins/multiproc.py", line 344, in _send_procs_to_workers
    self.procs[jobid].run(updatehash=updatehash)
  File "/usr/local/miniconda/lib/python3.10/site-packages/nipype/pipeline/engine/nodes.py", line 527, in run
    result = self._run_interface(execute=True)
  File "/usr/local/miniconda/lib/python3.10/site-packages/nipype/pipeline/engine/nodes.py", line 645, in _run_interface
    return self._run_command(execute)
  File "/usr/local/miniconda/lib/python3.10/site-packages/nipype/pipeline/engine/nodes.py", line 771, in _run_command
    raise NodeExecutionError(msg)
nipype.pipeline.engine.nodes.NodeExecutionError: Exception raised while executing Node ds_qc_metadata.

Traceback:
    Traceback (most recent call last):
      File "/usr/local/miniconda/lib/python3.10/site-packages/nipype/interfaces/base/core.py", line 397, in run
        runtime = self._run_interface(runtime)
      File "/usr/local/miniconda/lib/python3.10/site-packages/niworkflows/interfaces/bids.py", line 732, in _run_interface
        _copy_any(orig_file, str(out_file))
      File "/usr/local/miniconda/lib/python3.10/site-packages/niworkflows/utils/misc.py", line 288, in _copy_any
        copyfile(src, dst, copy=True, use_hardlink=True)
      File "/usr/local/miniconda/lib/python3.10/site-packages/nipype/utils/filemanip.py", line 447, in copyfile
        copyfile(
      File "/usr/local/miniconda/lib/python3.10/site-packages/nipype/utils/filemanip.py", line 386, in copyfile
        elif posixpath.samefile(newfile, originalfile):
      File "/usr/local/miniconda/lib/python3.10/genericpath.py", line 100, in samefile
        s1 = os.stat(f1)
    FileNotFoundError: [Errno 2] No such file or directory: '/seastor/CAMP/derivatives/xcpd_no_gsr/xcp_d/desc-linc_qc.json'

Additional information / screenshots

No response

tsalo commented 7 months ago

I wonder if adding an overwrite option to DerivativesDataSink would be useful?

effigies commented 7 months ago

Generally it seems like a bad idea to aim two parallel jobs at the same output directory. Especially when working on networked filesystems, synchronization is uncertain at best.

Happy to consider a patch, but my experience is that these are massive time sinks that are better handled by using separate output directories and merging in a single process, post-run.

tsalo commented 7 months ago

I might not be thinking about this the right way, but in my case (XCP-D), we warp atlases to the standard space and resolution used across runs (and potentially subjects), so we only want one copy of the warped+resampled atlas in the derivatives. I have this done in the single-subject workflow since that's where collect_data is called and I need the BOLD runs selected by collect_data to identify the space and resolution to warp the atlas to, but we expect the space and resolution to be consistent within and across subjects, so the files are written out to the same location.

Does that approach make sense to you?