nipype / pydra

Pydra Dataflow Engine
https://nipype.github.io/pydra/
Other
120 stars 59 forks source link

convert to pydra #291

Open zswgzx opened 4 years ago

zswgzx commented 4 years ago

What are you trying to accomplish?

convert to pydra task/workflow and setup such pipeline

What have you tried?

still have trouble with nipype.interfaces.dcm2nii.Dcm2niix so any suggestion welcomed

python script ```Python #!/home/shengwei/anaconda3/bin/python from logging import INFO, basicConfig, error, log # from grp import getgrnam from nipype.interfaces.dcm2nii import Dcm2niix from os import chown, mkdir, path, scandir, walk # from pwd import getpwnam from re import IGNORECASE, search from shutil import rmtree from tarfile import is_tarfile, open from zipfile import BadZipFile, ZipFile, is_zipfile _STAGING_DIR = 'staging' _CONVERT_BASE_DIR = 'uc' class MriWrapper(object): """This class obtains meta data (i.e. date, visit, projid, protocol) of scan from its path""" def __init__(self, path): self.path = path scankey_search = search('\d{6}_\d{2}_\d{8}', path) if scankey_search is None: sys.exit('Could not find scan key from ' + path) else: scan_key = scankey_search.group(0).split('_') self.date = scan_key[0] self.visit = scan_key[1] self.projid = scan_key[2] if search('bannockburn', path): self.protocol = ('bannockburn',) elif search('mg', path): self.protocol = ('mg',) elif search('uc', path): self.protocol = ('uc',) else: sys.exit('No such active or valid protocol exists') def find_dicom(base_path): """find dicom files""" for entry in scandir(base_path): if not entry.name.startswith('.') and entry.is_file(): if entry.name.endswith('.zip', IGNORECASE) or entry.name.endswith('.tar.gz', IGNORECASE): if not search('nii|nifti|nifti|par|P\d{5,6}.zip', entry.name, IGNORECASE): try: if is_zipfile(entry.path): if 'dicom' in entry.name or 'DICOM'in entry.name: return entry else: sys.exit('Zip file name does not contain "dicom"') elif is_tarfile(entry.path): if check_dicom_tar(open(entry.path, 'r|gz')): return entry except RuntimeError: sys.exit('DICOM not found or invalid in ' + base_path) # TODO: speed up by iterating over files manually and returning when dicom is found (vs using getnames()) def check_dicom_tar(tar_file): for entry in tar_file.getnames(): if search('dicom', entry, IGNORECASE) is not None: return True return False def get_dcm2niix_path(): """find the path for dcm2niix input folder, assuming it is the ONLY deepest path""" stage_entries = [entry for entry in walk(_STAGING_DIR)] return stage_entries[-1][0] def convert_dicom_dir(dicom_dir): """convert directory with dicom files""" print('Checking ' + dicom_dir) basicConfig(filename='convert.log', filemode='w', level=INFO) dicom = find_dicom(dicom_dir) if not dicom: error('No DICOM in ' + dicom_dir) else: log(INFO, 'Converting ' + dicom.path) wrapper = MriWrapper(dicom.path) output_dir = path.join(path.dirname(dicom.path), wrapper.projid + '_' + wrapper.visit + '_nii') if not path.exists(output_dir): mkdir(output_dir) convert_dicom_file(dicom_dir, dicom, output_dir) print('Clearing staging folders') rmtree(_STAGING_DIR) mkdir(_STAGING_DIR) # userId = getpwnam('mriadmin').pw_uid # groupId = getgrnam('mri').gr_gid # for root, dirs, files in walk(output_dir): # for subdir in dirs: # chown(path.join(root, subdir), userId, groupId) # for file in files: # chown(path.join(root, file), userId, groupId) def convert_dicom_file(dicom_dir, dicom_file, output_dir, merge_files=False): """convert dicom compressed zip/tar file""" # unzip dicom to staging directory if dicom_file.name.endswith(".zip", IGNORECASE): print("Unzipping " + path.join(dicom_dir, dicom_file.name)) try: ZipFile(dicom_file.path).extractall(_STAGING_DIR) except BadZipfile: sys.exit('Bad zipfile! Skipping') elif dicom_file.name.endswith(".tar.gz", IGNORECASE): print("Untarring " + dicom_file.name) open(dicom_file.path, "r:gz").extractall(_STAGING_DIR) else: sys.exit('DICOM entry not recognized. Skipping ' + dicom_file.name) try: converter = Dcm2niix() converter.inputs.source_dir = get_dcm2niix_path() converter.inputs.output_dir = output_dir converter.inputs.ignore_deriv = True converter.inputs.merge_imgs = merge_files converter.inputs.single_file = True converter.inputs.out_filename = "%d" converter.inputs.bids_format = True converter.inputs.anon_bids = True print(converter.cmdline) # system('dcm2niix -i y -f %d -v n -z y -o ' + dst_dir + ' ' + src_dir) except RuntimeError: rmtree(output_dir) sys.exit('Error converting, removed ' + output_dir) if __name__ == '__main__': scans = [folder[0] for folder in walk(_CONVERT_BASE_DIR)][1:] for scan in scans: if 'nii' not in scan: convert_dicom_dir(scan) ```
zswgzx commented 4 years ago

some context:

output from the script is

Unzipping uc/200228_12_85553356/dicomdata.zip
dcm2niix -ba y -b y -z y -x n -t n -i y -m n -f %d -o uc/200228_12_85553356/85553356_12_nii -s y -v n staging/dicomdata/DICOM
Clearing staging folders
Checking uc/200228_12_28099281
Unzipping uc/200228_12_28099281/dicomdata.zip
dcm2niix -ba y -b y -z y -x n -t n -i y -m n -f %d -o uc/200228_12_28099281/28099281_12_nii -s y -v n staging/dicomdata/DICOM/00000002
Clearing staging folders

and folder structure:

uc
├── 200228_12_28099281
│   ├── 28099281_12_nii
│   └── dicomdata.zip
└── 200228_12_85553356
    ├── 85553356_12_nii
    └── dicomdata.zip

but the uc//nii/ are empty.

djarecka commented 4 years ago

you have an issue when trying to convert to pydra, or you're not able to run with nipype?

zswgzx commented 4 years ago

from pydra perspective, i'd like to convert it w/ pydra task/workflow to get a hang of it

for the script specifically, only nipype.interfaces.dcm2nii was used but the result is unexpected. so as a side question, any suggestion from nipype guru would be appreciated.

djarecka commented 4 years ago

not sure what do you mean that results are unexpected.

But for the pydra converter, can you take a look at this file and see if you can use it for the nipype interfaces. Or perhaps you can start from Function Task to convert the python function first

djarecka commented 4 years ago

in general it would be good if you can think how do you want your workflow to look like: specify the tasks

zswgzx commented 4 years ago

my expectation is that those two folders were not empty but with *.{nii.gz,json}

thinking about how to 'pydra' the code and will try to add as function task

djarecka commented 4 years ago

you should think about workflow in general - you have multiple steps in this code. Ideally every single step should be a Task that has specific input and output, and these inputs/outputs could be connected together to build a Workflow.

Once you have this we can think about converting this to pydra

zswgzx commented 4 years ago

would this work?

@pydra.mark.task
    @pydra.mark.annotate({"return": {"output_image": ty.Any}})
    def convert_dicom_compressed(src_folder, dst_folder, **kwargs):
        """ Apply dcm2niix to convert DICOM zip/tar files

        :param str src_folder: source folder name
        :param str dst_folder: output folder name
        :param dict kwargs: other available dcm2niix option(s), e.g. -b, -m, etc. """

        stage_folder = decompress(src_folder)
        output_image_ = dcm2niix(stage_folder, dst_folder, )
        return output_image
effigies commented 4 years ago

This looks like it's missing some things, but overall looks reasonable.

What are decompress and dcm2niix?

zswgzx commented 4 years ago

just additional tasks that I haven't instantiated. for dcm2niix, it would be a nipype interface

effigies commented 4 years ago

Some additional comments:

You can annotate the function directly.

@pydra.mark.task
def convert_dicom_compressed(
        src_folder : Directory,
        dst_folder : str,
        ...) -> {"output_image": File}:

I'm not sure that we handle **kwargs well. I can't really remember and don't see an obvious indicator of it in the code.

But I might go about it differently, anyway, and make a workflow:

@pydra.mark.task
def decompress(src_folder: Directory) -> Directory:
    dst_dir = ...
    # unzip into dst_dir
    return dst_dir

wf = pydra.Workflow(name='decompress_and_convert_wf', input_spec=['src_folder'])
wf.add(decompress(src_folder=wf.lzin.src_file, name="decompress"))
wf.add(Nipype1Task(Dcm2niix(), source_dir=wf.decompress.lzout.out), name='dcm2niix)
wf.set_output([("out", wf.dcm2niix.lzout.converted_files)])

with pydra.Submitter() as sub:
    sub(wf)

I haven't tested this, but does the logic make sense? (cc @djarecka for suggestions on improvement)

djarecka commented 4 years ago

kwargs should work for Function Tasks

zswgzx commented 4 years ago

@effigies makes perfect sense! I'll probably get back to this next week. Will keep this issue open, or I can close it in case that it's not appropriate.

@djarecka if I can contribute to this repo or help with any basic improvement(s), feel free to let me know or assign to me.

djarecka commented 4 years ago

@zswgzx - we can keep this issue open during OHBM.

Feel free to check our issues, but also if you can convert your script to pydra Workflow and create a notebook for pydra-tutorial, that would be really great contribution! We need more examples! Let me know if you're interested in doing this!

zswgzx commented 4 years ago

absolutely!

Like @effigies pointed out, a new (to me at least) way to annotate is NOT found in the tutorial (or I might miss it somewhere in the hiding). Let me know if it's relevant to update as needed.