cgat-developers / ruffus

CGAT-ruffus is a lightweight python module for running computational pipelines
MIT License
173 stars 34 forks source link

@split and @subdivide not computing output files correctly #39

Closed bunbun closed 9 years ago

bunbun commented 9 years ago

Reported by keren@eng.ucsd.edu, Jan 23 2014 Any time the output files need to be determined at run time (such as with @split and @subdivide), when a job starts, the files currently present that match the output's regular expression are assumed to be the job's final output. This causes the job's output files to not be checkpointed correctly after the job completes.

I have attached a minimal example to show the kinds of errors this causes.

The first run creates the following files: start, cluster_0

Nothing matched "cluster_*" before the job ran, so "cluster_0" was never checkpointed after the job completed.

If you run the pipeline a second time, ruffus thinks that "cluster0" is a leftover file, and the cluster task and everything downstream gets re-run unnecessarily. Now "cluster*" matches "cluster_0", so that file gets checkpointed, but the new "cluster_1" file does not.

So this pipeline is never up-to-date. When you re-run the pipeline for the N+1st time, it fails to checkpoint "cluster_N".

    #!/usr/bin/python
    import os
    import sys

    sys.path.insert(0, "/home/lg/src/oss/ruffus")

    from ruffus import pipeline_run
    from ruffus import formatter
    from ruffus import originate
    from ruffus import split

    import ruffus.cmdline as cmdline
    parser = cmdline.get_argparse(description='WHAT DOES THIS PIPELINE DO?')
    options = parser.parse_args()
    logger, logger_mutex = cmdline.setup_logging (__name__, options.log_file, options.verbose)

    @originate('start')
    def make_start(outfile):
        open(outfile, 'a').close()

    @split(make_start, 'cluster_*')
    def cluster(infile, outfiles, *args):
        infile = infile[0]
        n_to_produce = len(outfiles) + 1
        for f in outfiles:
            os.unlink(f)
        for i in range(n_to_produce):
            f = 'cluster_{}'.format(i)
            open(f, 'a').close()

    #pipeline_run(verbose=10)
    cmdline.run (options)
bunbun commented 9 years ago

keren@eng.ucsd.edu wrote:

I have written a first draft of a possible solution.

Basically, if a task produces indeterminate output, it waits to do checksumming until all its jobs are complete, then uses file_names_from_tasks_globs() to re-find all the output files.

However, this fails when the search string contains named formatter() or regexp groups like "{basename[0]}".

Also, it would be better if this could be done on job basis, instead of waiting until the whole task is complete.

bunbun commented 9 years ago

2 keren@eng.ucsd.edu

Also, here is the test pipeline I've been using while trying to fix this bug. It contains @originate, @split, and @subdivide.

It could easily be expanded to a full test case, where after the pipeline gets run the test fails if the checksum file is incomplete, or any task is not up-to-date.

    import os

    from ruffus import pipeline_run
    from ruffus import suffix
    from ruffus import originate
    from ruffus import split
    from ruffus import transform
    from ruffus import subdivide
    from ruffus import formatter
    from ruffus import collate

    import ruffus.cmdline as cmdline
    parser = cmdline.get_argparse(description='WHAT DOES THIS PIPELINE DO?')
    options = parser.parse_args()
    logger, logger_mutex = cmdline.setup_logging (__name__, options.log_file, options.verbose)

    @originate('start_indeterminate_*')
    def make_origin_a(outfiles):
        for f in outfiles:
            os.unlink(f)
        for i in 'abc':
            outfile = 'start_indeterminate_{}'.format(i)
            open(outfile, 'a').close()

    @originate('start')
    def make_start(outfile):
        open(outfile, 'a').close()

    @split(make_start, 'cluster_*.raw')
    def cluster(infiles, outfiles):
        infile = infiles[0]
        n_to_produce = len(outfiles) + 1
        for f in outfiles:
            os.unlink(f)
        for i in range(n_to_produce):
            f = 'cluster_{}.raw'.format(i)
            open(f, 'a').close()

    @subdivide(cluster, formatter(), '{basename[0]}_*.reclustered')
    def cluster_clusters(infile, outfiles):
        for f in outfiles:
            os.unlink(f)
        for i in range(3):
            f = infile[:-len('.raw')]
            f = '{}_{}.reclustered'.format(f, i)
            open(f, 'a').close()

    #pipeline_run(verbose=1)
    cmdline.run (options)
bunbun commented 9 years ago

The code has been patched in the repository. I am keeping the globs in the output file around and re-expanding this as checksuming time inside pipeline_run(). As you wanted, this happens per job.

There was actually a comment in the code and change file for v.2.5 indicating that "BUG FIX: Output producing wild cards was not saved in the checksum files!!!" The bug fix didn't work and there was no unit test ...

Very embarrassing.

bunbun commented 9 years ago

passed unit test

test/test_split_subdivide_checkpointing.py