nextflow-io / nextflow

A DSL for data-driven computational pipelines
http://nextflow.io
Apache License 2.0
2.68k stars 622 forks source link

storeDir is not working #1359

Open lucacozzuto opened 4 years ago

lucacozzuto commented 4 years ago

Bug report

Using storeDir makes the pipeline crash complaining that the required output file is not there any more.

Expected behavior and actual behavior

In past storeDir was copying the files to another folder, now it moves them and this breaks the workflow execution.

Caused by: Missing output file(s) STARgenome expected by process buildIndex (Cowc_gDNA_mtDNA.fasta)

Command executed:

mkdir STARgenome if [ echo Cowc_gDNA_mtDNA.fasta | grep ".gz" ]; then zcat Cowc_gDNA_mtDNA.fasta > basename Cowc_gDNA_mtDNA.fasta .gz STAR --runMode genomeGenerate --genomeDir STARgenome --runThreadN 8 --genomeFastaFiles basename Cowc_gDNA_mtDNA.fasta .gz --sjdbGTFfile Cowc_long.annot.exon.gtf --sjdbOverhang 49 --outFileNamePrefix STARgenome ; rm basename Cowc_gDNA_mtDNA.fasta .gz else STAR --runMode genomeGenerate --genomeDir STARgenome --runThreadN 8 --genomeFastaFiles Cowc_gDNA_mtDNA.fasta --sjdbGTFfile Cowc_long.annot.exon.gtf --sjdbOverhang 49 --outFileNamePrefix STARgenome
fi

Command exit status: 0

Command output: Nov 05 10:29:40 ..... started STAR run Nov 05 10:29:40 ... starting to generate Genome files Nov 05 10:29:40 ... starting to sort Suffix Array. This may take a long time... Nov 05 10:29:41 ... sorting Suffix Array chunks and saving them to disk... Nov 05 10:29:45 ... loading chunks from disk, packing SA... Nov 05 10:29:46 ... finished generating suffix array Nov 05 10:29:46 ... generating Suffix Array index Nov 05 10:29:52 ... completed Suffix Array index Nov 05 10:29:52 ..... processing annotations GTF Nov 05 10:29:52 ..... inserting junctions into the genome indices Nov 05 10:30:03 ... writing Genome to disk ... Nov 05 10:30:03 ... writing Suffix Array to disk ... Nov 05 10:30:03 ... writing SAindex to disk Nov 05 10:30:09 ..... finished successfully

lucacozzuto commented 4 years ago

it was a problem with our storage... I''m wondering if this kind of asynchronous copy can give problem to the pipelines

lucacozzuto commented 2 years ago

I'll add a couple of suggestions: it looks like this problem is related to the fact that big files take time to copy to the final directory indicated in storageDir. So you might want to add either a sleep() after the execution of the command or into a "afterScript" definition. Another solution can be to add a retry with an incremental sleep in the nextflow.config file

lucacozzuto commented 4 months ago

From time to time I still see this problem with storeDir... I don't know how many people see it. Maybe @JoseEspinosa also experience it

lucacozzuto commented 4 months ago

it would be nice to have a function that allows some waiting time before the file is copied to the place indicated by storeDir

bentsherman commented 4 months ago

Are you trying to use the storeDir output in a downstream task within the same pipeline? I think storeDir is designed only to work across pipeline runs

@pditommaso it looks like the task outputs are actually moved into the storeDir, which would prevent downstream tasks from being able to also use the output. Maybe instead we should keep a link in the task directory to the storeDir so that it can still be used as a regular output

lucacozzuto commented 4 months ago

Hi @bentsherman, I'm trying to run the pipeline just once and it fails. Likely because the process is looking at the output file in a position while it is being copied.

lucacozzuto commented 4 months ago

I think the presence of a soft link can be a patch (but not so sure it will work in AWS. batch).

bentsherman commented 4 months ago

Recently I was reading about software-defined assets in Dagster, which looks like this:

@asset
def logins(website_events: DataFrame) -> DataFrame:
   return website_events[website_events["type"] == "login"]

This flips the script, instead of saying "this is a task, here are it's outputs", it says "this is a data asset, here is how to compute it". This is a very intuitive way to think about the pipeline as it focuses on the thing we care about most, the results.

I wonder if we could do something like this in Nextflow. Ideally keeping the same syntax for processes, but making the workflow definition more output-centric.

I think we need to combine storeDir and publishDir into a single concept and make it more prominent in the workflow definition. I should be able to say "this asset should exist in (storeDir/publishDir), here's how to re-compute it if needed (process definition)":

Using rnaseq-nf as an example:

process INDEX {
  tag "$transcriptome.simpleName"
  conda 'salmon=1.10.2'
  input:
    path transcriptome
  output:
    path index
  script:
    """
    salmon index --threads $task.cpus -t $transcriptome -i index
    """
}

workflow {
  index = asset('/data/index') {
    INDEX(params.transcriptome)
  }
}

The main idea is to make the store/publish dir external to the process invocation. The workflow should request an asset from some local path, show how to re-compute it, then Nextflow can figure out if the request asset is up to date and decide whether to re-compute it.

Sorry for the rant @lucacozzuto , I know you're just trying to fix the bug right in front of you, I just got inspired to think about the long-term solution. If you could provide a minimal test case, we should be able to get to the bottom of it

lucacozzuto commented 4 months ago

Don't worry I love this way of thinking more in broad terms... I just think that the output centric way can be a bit reductive... A nice thing about Nextflow is to forget about the naming of input / outputs... So I would think twice about changing this assets of the language. About storedir I'll work on a test case

bentsherman commented 4 months ago

I agree with your point about naming things. Dagster seems to work well with the output-centric model because it's all just variable names and in-memory dataframes, so you don't have to worry about organizing files in a filesystem. Nextflow also saves you from having to name things all the time by having the hidden work directory and then you specify which outputs to publish and where. An output-centric model in Nextflow should continue in that pattern.

Maybe we can incorporate storeDir into the upcoming workflow publish definition #4784 :

workflow {
  main:
  index = INDEX(params.transcriptome)

  publish:
  index >> '/data/index'
}

One thing we have added with this feature is an option to not re-publish a file if the publish destination is up to date, similar to the cache directive. I guess the natural next step would be, if all outputs published by a process are already up to date in the publish destination, don't re-execute the task.

lucacozzuto commented 4 months ago

Agree. However, I cannot always reproduce the error. So it is likely that if the (big) file is not copied completely, it fails. Storedir is useful when you have a big task (like indexing) that you don't want to re-execute each time and use the same file among different pipeline runs... but we really need some "waiting time" between the copy in the "store" folder and the check of the existence of the output file.

bentsherman commented 4 months ago

Looking at the code more closely, the way it works is:

Because the mv command is part of the job, I would expect the storeDir to be up to date once the job is finished

lucacozzuto commented 4 months ago

the problem happens on a HPC... so I think is some problem with some asynchronous process

bentsherman commented 4 months ago

The procedure I described is not asynchronous. Nextflow does not check the storeDir until after the job and file transfer has completed.

I think a way to test your theory would be to copy a large file to the storeDir and then try to access the file from the storeDir

mbeckste commented 2 months ago

We noticed exactly the same problem in one of the epi2me ONT workflows today and I agree with @lucacozzuto that it is a problem with asynchrony in a HPC environment. In our scenario the Nextflow head job and the job using storeDir were scheduled to two different nodes in our cluster and workdir (NXF_WORK) and also the storeDir are on a shared filesystem with some latency. The job with the storeDir process finished successfully (RC=0), i.e. nxf_unstage() in .command.run and subsequently nx_fs_move() moves all the files to storeDir. Then Nextflow (on a different node) checks the output and due to slow NFS does not find the file(s) and finally resulting in a Missing output file(s) error.

bentsherman commented 1 month ago

I remember that the grid executors have this hack to deal with NFS delays when checking the exitcode file for a job: https://github.com/nextflow-io/nextflow/blob/12b027ee7e70d65bdee912856478894af4602170/modules/nextflow/src/main/groovy/nextflow/executor/GridTaskHandler.groovy#L321-L330

Apparently if you list a directory it will force-update the NFS metadata. Might be able to do the same thing for storeDir, if someone would like to give it a try: https://github.com/nextflow-io/nextflow/blob/12b027ee7e70d65bdee912856478894af4602170/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy#L850-L858

I think it would be enough to list the storeDir before calling collectOutputs()