cgat-developers / cgat-core

Core functionality of the CGAT code
MIT License
31 stars 13 forks source link

Time stamps not updating on rerun tasks #75

Closed IanSudbery closed 5 years ago

IanSudbery commented 5 years ago

Moving this issue over from https://github.com/cgat-developers/ruffus/issues/105 as apparently this is a cgat-core issue. I appoligise I thought all the workflow logical was handled by ruffus.

IanSudbery commented 5 years ago

Original text of the issue: I'm having a rather odd problem. I've only just noticed it, but a guess it could have been happening a white. It appears that ruffus is not detecting changes in timestamps for files that have changed while it is running.

Take the simple example pipeline below:

from cgatcore import pipeline as P
from ruffus import transform, formatter
import sys
from time import sleep

@transform("*.input.file",
           formatter(),
           "{basename[0]}.output1.file")
def step1(infile, outfile):

    P.touch_file(outfile)

@transform(step1,
           formatter(),
           "{basename[0]}.output2.file")
def step2(infile, outfile):

    sleep(10)
    P.touch_file(outfile)

@transform(step2,
           formatter(),
           "{basename[0]}.output3.file")
def step3(infile,
          outfile):

    sleep(10)
    P.touch_file(outfile)

def main(argv=None):
    if argv is None:
        argv = sys.argv
    P.main(argv)

if __name__ == "__main__":
    sys.exit(P.main(sys.argv))  

If I run this with two files (1.input.file, and 2.input.file), the pipeline runs through fine and the required output files are created.

$ ll
total 104
-rw-rw-r-- 1 UUUU GG      0 Feb  7 14:21 1.input.file
-rw-rw-r-- 1 UUUU GG      0 Feb  7 14:22 1.input.output1.file
-rw-rw-r-- 1 UUUU GG      0 Feb  7 14:22 1.input.output1.output2.file
-rw-rw-r-- 1 UUUU GG      0 Feb  7 14:22 1.input.output1.output2.output3.file
-rw-rw-r-- 1 UUUU GG      0 Feb  7 14:05 2.input.file
-rw-rw-r-- 1 UUUU GG      0 Feb  7 14:22 2.input.output1.file
-rw-rw-r-- 1 UUUU GG      0 Feb  7 14:22 2.input.output1.output2.file
-rw-rw-r-- 1 UUUU GG      0 Feb  7 14:23 2.input.output1.output2.output3.file

If I now touch the first input file, the pipeline detects that 1.input.output1.file is out of date and updates. But if fails to update 1.input.output1.output2.file, reporting that it is already up to date. Revlant part of the log:

2019-02-07 14:26:03,120 INFO Task enters queue = 'step1' 
# 2019-02-07 14:26:03,122 INFO {"task": "'step1'", "task_status": "update", "task_total": 2, "task_completed": 1, "task_completed_percent": 50.0}
# 2019-02-07 14:26:03,121 INFO     Job  = [1.input.file -> 1.input.output1.file]  \
#                                Input files: \
#                                 * 07 Feb 2019 14:25:58.00: 1.input.file \
#                                Output files: \
#                                 * 07 Feb 2019 14:22:20.00: 1.input.output1.file \
#                               
# 2019-02-07 14:26:03,223 INFO {"task": "'step1'", "task_status": "update", "task_total": 2, "task_completed": 1, "task_completed_percent": 50.0}
# 2019-02-07 14:26:03,223 INFO     Job  = [2.input.file -> 2.input.output1.file] unnecessary: already Up to date \
#                               \
#                                Input files: \
#                                   07 Feb 2019 14:05:20.00: 2.input.file \
#                                Output files: \
#                                   07 Feb 2019 14:22:20.00: 2.input.output1.file \
#                              
# 2019-02-07 14:26:03,327 INFO {"task": "'step1'", "task_status": "update", "task_total": 2, "task_completed": 2, "task_completed_percent": 100.0}
# 2019-02-07 14:26:03,327 INFO     Job  = [1.input.file -> 1.input.output1.file] completed
# 2019-02-07 14:26:03,329 INFO {"task": "'step1'", "task_status": "completed", "task_total": 2, "task_completed": 2, "task_completed_percent": 100.0}
# 2019-02-07 14:26:03,329 INFO Completed Task = 'step1' 
# 2019-02-07 14:26:03,329 INFO Task enters queue = 'step2' 
# 2019-02-07 14:26:03,330 INFO {"task": "'step2'", "task_status": "update", "task_total": 2, "task_completed": 2, "task_completed_percent": 100.0}
# 2019-02-07 14:26:03,330 INFO     Job  = [1.input.output1.file -> 1.input.output1.output2.file] unnecessary: already Up to date \
#                               \
#                                Input files: \
#                                   07 Feb 2019 14:22:20.00: 1.input.output1.file \
#                                Output files: \
#                                   07 Feb 2019 14:22:31.00: 1.input.output1.output2.file \
#                              
# 2019-02-07 14:26:03,331 INFO {"task": "'step2'", "task_status": "update", "task_total": 2, "task_completed": 2, "task_completed_percent": 100.0}
# 2019-02-07 14:26:03,331 INFO     Job  = [2.input.output1.file -> 2.input.output1.output2.file] unnecessary: already Up to date \
#                               \
#                                Input files: \
#                                   07 Feb 2019 14:22:20.00: 2.input.output1.file \
#                                Output files: \
#                                   07 Feb 2019 14:22:41.00: 2.input.output1.output2.file \
#                              
# 2019-02-07 14:26:03,332 INFO {"task": "'step2'", "task_status": "uptodate", "task_total": 2, "task_completed": 2, "task_completed_percent": 100.0}
# 2019-02-07 14:26:03,332 INFO Uptodate Task = 'step2'
# 2019-02-07 14:26:03,333 INFO Task enters queue = 'step3' 
# 2019-02-07 14:26:03,334 INFO {"task": "'step3'", "task_status": "update", "task_total": 2, "task_completed": 2, "task_completed_percent": 100.0}
# 2019-02-07 14:26:03,334 INFO     Job  = [1.input.output1.output2.file -> 1.input.output1.output2.output3.file] unnecessary: already Up to date \
#                               \
#                                Input files: \
#                                   07 Feb 2019 14:22:31.00: 1.input.output1.output2.file \
#                                Output files: \
#                                   07 Feb 2019 14:22:51.00: 1.input.output1.output2.output3.file \
#                              
# 2019-02-07 14:26:03,335 INFO {"task": "'step3'", "task_status": "update", "task_total": 2, "task_completed": 2, "task_completed_percent": 100.0}
# 2019-02-07 14:26:03,335 INFO     Job  = [2.input.output1.output2.file -> 2.input.output1.output2.output3.file] unnecessary: already Up to date \
#                               \
#                                Input files: \
#                                   07 Feb 2019 14:22:41.00: 2.input.output1.output2.file \
#                                Output files: \
#                                   07 Feb 2019 14:23:01.00: 2.input.output1.output2.output3.file \
#                              
# 2019-02-07 14:26:03,336 INFO {"task": "'step3'", "task_status": "uptodate", "task_total": 2, "task_completed": 2, "task_completed_percent": 100.0}
# 2019-02-07 14:26:03,335 INFO Uptodate Task = 'step3'
# 2019-02-07 14:26:03,349 INFO job finished in 0 seconds at Thu Feb  7 14:26:03 2019 --  1.12  0.63  0.01  0.02 -- a92c63a6-a4df-4ece-944b-162bdfab2af4

If we look now, 1.input.output1.file has the correct timestamp, but ruffus appears to have failed to pick it up.

-rw-rw-r-- 1 UUUU GG      0 Feb  7 14:25 1.input.file
-rw-rw-r-- 1 UUUU GG      0 Feb  7 14:26 1.input.output1.file
-rw-rw-r-- 1 UUUU GG      0 Feb  7 14:22 1.input.output1.output2.file
-rw-rw-r-- 1 UUUU GG      0 Feb  7 14:22 1.input.output1.output2.output3.file
-rw-rw-r-- 1 UUUU GG      0 Feb  7 14:05 2.input.file
-rw-rw-r-- 1 UUUU GG      0 Feb  7 14:22 2.input.output1.file
-rw-rw-r-- 1 UUUU GG      0 Feb  7 14:22 2.input.output1.output2.file
-rw-rw-r-- 1 UUUU GG      0 Feb  7 14:23 2.input.output1.output2.output3.file
IanSudbery commented 5 years ago

@AndreasHeger 's reply: This is a cgatcore issue/feature. In cgatcore, all the os.stat and similar calls are cached to prevent stat'ing the same file multiple times. The reason for this is that this is a slow operation on networked file systems (for example, our Isilon). As a downside, yes, timestamp changes happening after a cgatcore pipeline starts will not be noticed.

IanSudbery commented 5 years ago

When did this happen!?! Doesn't this defeat the entire point of dependency checking? Isn't one of the raison d'être of workflow management that when an input file is updated, the downstream dependencies of this are updated? I'm pretty sure we state this is the case in the manuscript don't we?

Surely in an 8 hour pipeline stat'ing a few files doesn't take that long?

Acribbs commented 5 years ago

This change seems to have happened a year ago during the merging with genomics code. I hadn't noticed it myself. I agree with ian that this is quite a useful feature, particularly for pipelines with lots of tasks. However, if its going to slow tasks down considerably then I suspect this is open for more discussion. This will depend largely on how slow is os.stat operation is.

In the last version of the manuscript we don't refer to this feature specifically though.

If I remember correctly we were getting lots of latency issues on the cluster with tasks starting without the previous one finishing, I don't know if the changes were implemented in response to those.

Which pipeline runs for 8 hours?

IanSudbery commented 5 years ago

Many (most?) of our pipelines run for that sort of ballpark. We have an allele specific expression pipeline that takes about 24 hrs, a somatic variation pipeline that runs for 3 days or so, and the last runs of our novel intron finding and quantification pipeline ran for over a week (it was running against 1500 bam files). Hell even a big mapping job can easily run for more than 8 hours when the cluster's not empty.

There are several weird things here: imagine a pipeline A->B->C->D, where A is updated. It would make a kind of sense if the update was just totally ignored. But its not. If A is updated, B will be marked as out of date and re-run. But C and D won't. That's pretty non-obvious behavior.

Secondly, if I do show, B, C and D will be listed as "Tasks to be run", but when I do make, C and D are not run (even if the summary at the start of the pipeline says they will be). If we want to go ahead with this, tasks should be marked as out-of-date at the start, based on whether their dependencies were updated, and run without rechecking the timestamps.

Why are we monkey patching ruffus from cgatcore when ruffus is under our domain these days? I mean, in the end, why even bother taking timestamps (and caching them), if we are not then going to properly update out-of-date tasks - surely just checking the existence or not of a file is enough in that case.

In the manuscript we say "only data sets that are missing or out-of-date are re-run. " Out-of-date tasks are not currently being re-run, although this is a standard feature of every workflow management system I've come across, right back to Make.

Sorry, didn't want to come off as ranting. I do think this needs some cleaning up which ever way we decide to go.

Acribbs commented 5 years ago

Wow thats quite some intense computational work your doing.

Ah yes you are correct and thats still in the current manuscript that I sent to david last week. Im currently working in san Diego until monday but when I return I will nag david to send the manuscript around again.

I did suggest to david that we have regular zoom meetings every 3 months or so with all developers to discuss general issues with code development, but that was a while ago now. However, I think when I get back to UK I will arrange something. This issue should be discussed because I completely agree with you. However, I think it would be useful to hear the motivation from Andreas as the specifics of why caching was implemented.

I agree that this sounds like it should be a ruffus fix and not a cgatcore.

AndreasHeger commented 5 years ago

Hi, @IanSudbery and @Acribbs

I am not 100% I understand what you mean by while the pipeline is running. However, I assume you touch the file while the pipeline is NOT running.

I think I was wrong to talk about the caching. Ruffus calls os.stat and file.exists many times and multiple times for the same file. The caching just means that ruffus remembers the first call and avoids doing additional calls, instead looking up calls. Caching reduces the time to get a pipeline going if you have many files to check and your file system is slow. However, the caching is not persistent, so should not affect the issue here.

I can replicate it and will investigate.

AndreasHeger commented 5 years ago

... looks like a cgatcore issue though, a ruffus only pipeline works:

from ruffus import transform, formatter, pipeline_run
from time import sleep

@transform("*.input.file",
           formatter(),
           "{basename[0]}.output1.file")
def step1(infile, outfile):
    open(outfile, "w").close()

@transform(step1,
           formatter(),
           "{basename[0]}.output2.file")
def step2(infile, outfile):

    sleep(10)
    open(outfile, "w").close()

@transform(step2,
           formatter(),
           "{basename[0]}.output3.file")
def step3(infile,
          outfile):

    sleep(10)
    open(outfile, "w").close()

if __name__ == "__main__":
    pipeline_run()
AndreasHeger commented 5 years ago

Ok, it is the caching - I will turn it off in cgatcore - closing this issue, thanks!