nextflow-io / nextflow

A DSL for data-driven computational pipelines
http://nextflow.io
Apache License 2.0
2.75k stars 629 forks source link

Feature request - custom caching strategy #5308

Open robsyme opened 1 month ago

robsyme commented 1 month ago

New feature

One of Nextflow's most useful features is the ability to send metadata though the process DAG in parallel to the file data. Not all of the metadata will be necessary to all processes, but any change to the metadata will result in a new task hash, and potentially unnecessary recomputation of tasks.

I propose that users would appreciate the ability to signal to Nextflow which parts of the metadata are relevant to the process.

Usage scenario

Let's say we have a samplesheet samples.csv

name,university
Paolo,Sapienza
Ben,Clemson

That we ingest in a simple workflow

workflow {
    Channel.fromPath("samples.csv")
    | splitCsv(header:true)
    | GetEducation
}

process GetEducation {
    input: val(meta)
    output: tuple val(meta), path("education.txt")
    script: "echo '${meta.name} going to ${meta.university}' > education.txt"
}

If we expand the samplesheet to include employer metadata:

name,university,employer
Paolo,Sapienza,Seqera
Ben,Clemson,Seqera

Any resumed runs will be unable to use the previously-computed tasks, as the GetEducation meta input has changed, even if we know that the new input will make no difference to the task calculation.

Suggest implementation

I propose a new option to the cache directive, whereby the user could supply a closure that returns some new value to be used as input into the task hash:

process GetEducation {
    cache { meta -> meta.subMap('name', 'university') }

    input: val(meta)
    output: tuple val(meta), path("education.txt")
    script: "echo '${meta.name} going to ${meta.university}' > education.txt"
}

This would would expand the opportunity to rely on the Nextflow task cache when a workflow is being interactively developed, and the extend of the required metadata is not known at the start of development.

The syntax might also include tuples and files (or bags of files):

process ApplyLearning {
    cache { meta, edu -> [meta.employer, edu] }

    input: tuple val(meta), path("education.txt")
    script: "cat education.txt > bio.txt && echo 'Now working at $meta.employer' >> bio.txt"
}
robsyme commented 1 month ago

Here is a self-contained example:

workflow {
    input = [
        [  id: 'test1', patient: 'patient_1' ],
        [  id: 'test2', patient: 'patient_3' ]
    ]

    Channel.fromList(input)
    | map { patient -> [patient.id, patient] }
    | set { patients }

    patients
    | map { id, patient -> id }
    | createFile
    | join(patients)
    | map { id, initialFile, patient -> [patient, initialFile] }
    | addPatientInfo
    | view
}

process createFile {
    input:
    val id

    output:
    tuple val(id), path("${id}.txt")

    script:
    """
    touch ${id}.txt
    echo "ID: ${id}" >> ${id}.txt
    """
}

process addPatientInfo {
    publishDir 'publish', mode: 'copy'

    input:
    tuple val(meta), path(initial_file)

    output:
    path "${meta.id}_with_patient.txt"

    script:
    """
    cp ${initial_file} ${meta.id}_with_patient.txt
    echo "Patient: ${meta.patient}" >> ${meta.id}_with_patient.txt
    """
}

If we were able to specify in the createFile which pieces of metadata were really relevant, this could be considerably shorter without sacrificing the reusability if the patient value changes for one or more samples:

workflow {
    input = [
        [  id: 'test1', patient: 'patient_1' ],
        [  id: 'test2', patient: 'patient_3' ]
    ]

    Channel.fromList(input)
    | createFile
    | addPatientInfo
    | view
}

process createFile {
    cache { meta.id }
    input:
    val(meta)

    output:
    tuple val(meta), path("${meta.id}.txt")

    script:
    """
    touch ${meta.id}.txt
    echo "ID: ${meta.id}" >> ${meta.id}.txt
    """
}

process addPatientInfo {
    publishDir 'publish', mode: 'copy'

    input:
    tuple val(meta), path(initial_file)

    output:
    path "${meta.id}_with_patient.txt"

    script:
    """
    cp ${initial_file} ${meta.id}_with_patient.txt
    echo "Patient: ${meta.patient}" >> ${meta.id}_with_patient.txt
    """
}
bentsherman commented 1 month ago

The cache closure wouldn't need any parameters, just use the task context like any other dynamic directive. Also would want to log a warning to make it clear that you're basically doing a "development" run as this option shouldn't be used in production.

Here's a quandary -- I think the user would need to specify this custom strategy from the beginning. If I do a run, then add some metadata columns, but I forgot to specify this closure, then I'm out of luck.

I guess you could make it work if you specify all inputs:

process ApplyLearning {
    cache { [meta.employer, edu] }

    input:
    tuple val(meta), path(edu, stageAs: "education.txt")

    // ...
}

I think you could tack that on to the second run and be able to resume.

bentsherman commented 1 month ago

Thinking about this for 30 more seconds, I would argue that if the intent is to declare that a process depends only on meta.employer and not the entire meta map, then you should just provide meta.employer as an input:

process ApplyLearning {
    input:
    val(employer)
    path("education.txt")

    // ...
}

I think the deeper issue is that processes are invoked with channels instead of individual values and that makes it difficult to control how inputs are passed into the process. It's much easier to just provide the channel you already have, which might have more inputs than you actually need, rather than adding more channel boilerplate.

But I think that solving that problem is probably the best way to address this one too.

workflow {
    input = [
        [  id: 'test1', patient: 'patient_1' ],
        [  id: 'test2', patient: 'patient_3' ]
    ]

    Channel.fromList(input)
    | map { meta ->
      def initial_file = createFile(meta.id)
      return [ meta, addPatientInfo(meta, initial_file) ]
    }
    | view
}

process createFile {
    input:
    val(id)

    output:
    path("${id}.txt")

    script:
    """
    touch ${id}.txt
    echo "ID: ${id}" >> ${id}.txt
    """
}

process addPatientInfo {
    publishDir 'publish', mode: 'copy'

    input:
    val(meta)
    path(initial_file)

    output:
    path "${meta.id}_with_patient.txt"

    script:
    """
    cp ${initial_file} ${meta.id}_with_patient.txt
    echo "Patient: ${meta.patient}" >> ${meta.id}_with_patient.txt
    """
}
pditommaso commented 1 month ago

What am i missing?

» nextflow run t.nf  -resume -ansi-log false
N E X T F L O W  ~  version 24.08.0-edge
Launching `t.nf` [jolly_majorana] DSL2 - revision: 6f25f6915a
[fa/d8a5fc] Cached process > GetEducation (1)
[33/9737d0] Cached process > GetEducation (2)
robsyme commented 1 month ago

Paolo - maybe a clearer example would be:

params.input = "samples.basic.csv"

workflow {
    Channel.fromPath(params.input)
    | splitCsv(header:true)
    | GetEducation
    | ApplyLearning
}

process GetEducation {
    input: val(meta)
    output: tuple val(meta), path("education.txt")
    script: "echo '$meta.name going to $meta.university' > education.txt"
}

process ApplyLearning {
    input: tuple val(meta), path("education.txt")
    script: "cat education.txt > bio.txt && echo 'Now working at $meta.employer' >> bio.txt"
}

And we have two samplesheets, a samples.basic.csv:

name,university
Paolo,Sapienza
Ben,Clemson

... and a samples.extended.csv

name,university,employer
Paolo,Sapienza,Seqera
Ben,Clemson,Seqera

If I run using the basic samplesheet first, we run all four tasks:

$ nextflow run . --input samples.basic.csv
 N E X T F L O W   ~  version 24.04.4
[15/6978b9] process > GetEducation (1)  [100%] 2 of 2 ✔
[fa/5be9e8] process > ApplyLearning (2) [100%] 2 of 2 ✔

If I launch using the extended samplesheet, I have to recalculate the GetEducation tasks, even though they are exactly the same tasks with exactly the same result:

$ nextflow run main.nf --input samples.extended.csv -resume 
 N E X T F L O W   ~  version 24.04.4
[11/64ea70] process > GetEducation (1)  [100%] 2 of 2 ✔
[19/6f8b45] process > ApplyLearning (2) [100%] 2 of 2 ✔

In the second (samples.extended.csv) run, the meta input variable is indeed different, but I have no way of telling Nextflow which parts of the meta variable are actually important for the purposes of task caching. It would be very useful if I could say to Nextflow that it's only meta.name and meta.university that are used by the GetEducation task. Other changes to this variable (like additional fields in the Map in this case) are inconsequential for the purposes of task caching.

wangyang1749 commented 1 month ago

I think it's important to be able to control the cache. Ability to manually cache time-consuming tasks.

Suppose my script is as follows.

process saySecond {
  scratch true
  stageInMode "copy"
  container "master:5000/stress:latest"

  cache 'lenient'
  input: 
    path db
  output:
    path("db2.json")
  script:
    """
    cat $db > db2.json
    """
}
workflow {
  ch_input = Channel2.fromPath(["/data/workspace/1/nf-hello/db/a.txt","/data/workspace/1/nf-hello/db/b.txt"])
  saySecond(ch_input)
}

View the hash of the task using the parameter -dump-hashes json.

[saySecond (1)] cache hash: a621c03e986060730fa7c05f43a0d754; mode: LENIENT; entries: [
    {
        "hash": "52fc10538560a06bf4eefb5029a3a408",
        "type": "java.util.UUID",
        "value": "68e313b7-a96b-482f-aa5a-5d3c2971779c"
    },
    {
        "hash": "4f60e6294a34dfbe2dd400257e58d05e",
        "type": "java.lang.String",
        "value": "saySecond"
    },
    {
        "hash": "115eed1375a4e25775dfa635bdaf0eee",
        "type": "java.lang.String",
        "value": "    \"\"\"\n    cat $db > db2.json\n    \"\"\"\n"
    },
    {
        "hash": "112f589ee6fa1b07d3f510e3885ea446",
        "type": "java.lang.String",
        "value": "master:5000/stress:latest"
    },
    {
        "hash": "0d39a5ff3a5c828a386e57fe6d0f07cd",
        "type": "java.lang.String",
        "value": "db"
    },
    {
        "hash": "9598b73b3492f1a8034f97fd39eff09f",
        "type": "nextflow.util.ArrayBag",
        "value": "[FileHolder(sourceObj:/data/workspace/1/nf-hello/db/a.txt, storePath:/data/workspace/1/nf-hello/db/a.txt, stageName:a.txt)]"
    },
    {
        "hash": "4f9d4b0d22865056c37fb6d9c2a04a67",
        "type": "java.lang.String",
        "value": "$"
    },
    {
        "hash": "16fe7483905cce7a85670e43e4678877",
        "type": "java.lang.Boolean",
        "value": "true"
    }
]

If nothing else, the cached Hasche will be the same the next time you run it. But if something goes wrong and an insignificant parameter is accidentally added to the script, Hasche changes. For example, adding some spaces.

cat $db            > db2.json
[saySecond (1)] cache hash: bec5c53e5888f9afc06d945fe5fb7d56; mode: LENIENT; entries: [
    {
        "hash": "52fc10538560a06bf4eefb5029a3a408",
        "type": "java.util.UUID",
        "value": "68e313b7-a96b-482f-aa5a-5d3c2971779c"
    },
    {
        "hash": "4f60e6294a34dfbe2dd400257e58d05e",
        "type": "java.lang.String",
        "value": "saySecond"
    },
    {
        "hash": "029c7f28b3785d1c47bdc397d45af9c8",
        "type": "java.lang.String",
        "value": "    \"\"\"\n    cat $db       > db2.json\n    \"\"\"\n"
    },
    {
        "hash": "112f589ee6fa1b07d3f510e3885ea446",
        "type": "java.lang.String",
        "value": "master:5000/stress:latest"
    },
    {
        "hash": "0d39a5ff3a5c828a386e57fe6d0f07cd",
        "type": "java.lang.String",
        "value": "db"
    },
    {
        "hash": "9598b73b3492f1a8034f97fd39eff09f",
        "type": "nextflow.util.ArrayBag",
        "value": "[FileHolder(sourceObj:/data/workspace/1/nf-hello/db/a.txt, storePath:/data/workspace/1/nf-hello/db/a.txt, stageName:a.txt)]"
    },
    {
        "hash": "4f9d4b0d22865056c37fb6d9c2a04a67",
        "type": "java.lang.String",
        "value": "$"
    },
    {
        "hash": "16fe7483905cce7a85670e43e4678877",
        "type": "java.lang.Boolean",
        "value": "true"
    }
]

Is it possible to have a command that manually computes the Hasche of the current input and script, then replaces the Hasche of the database with the newly computed Hasche, allowing cache a time-consuming task.

Or add a field to the database that can be forced to be cached, and start forcing the cache when it is determined that the result has been generated

robsyme commented 1 month ago

Community request for a similar feature here: https://community.seqera.io/t/process-input-that-is-not-cached-and-does-not-affect-task-hash/1209

SPPearce commented 2 weeks ago

This would be extremely useful for the in-house pipeline that I'm currently building; I have ended up using a bunch of subMaps to only select the parts of the meta map that are required for each process, to ensure that changing the initial metadata doesn't invalidate the entire cache.

bentsherman commented 2 weeks ago

Isn't it the same amount of work to provide the subMap as an input vs through a custom cache directive?

pditommaso commented 2 weeks ago

+1 for subMap. we could think custom cache when will have custom type, and in which we may think to have some declaration about caching

robsyme commented 2 weeks ago

Ben - the issue will be re-joining the full metadata back after the process. Contrast:


workflow {
    people = Channel.of(
        [name:"Rob", employer:"Seqera", loves:"Nextflow, and making Ben's life harder"],
        [name:"Ben", employer:"Seqera", loves:"Nextflow"],
        [name:"Paolo", employer:"Seqera", loves:"Nextflow"],
        [name:"Simon", employer:"NeoGenomics", loves:"Nextflow"]
    )

    people
    | map { person -> person.subMap('name', 'employer') }
    | DoSomething
    | join(people.map { person -> [person.subMap('name', 'employer'), person] })
    | map { _submap, txtFile, person -> [person, txtFile] }
    | view
}

process DoSomething {
    input: val(person)
    output: tuple val(person), path("${person.name}.txt")
    script:
    """
    echo "Hello ${person.name} from ${person.employer}" > ${person.name}.txt
    """
}

with

workflow {
    people = Channel.of(
        [name:"Rob", employer:"Seqera", loves:"Nextflow, and making Ben's life harder"],
        [name:"Ben", employer:"Seqera", loves:"Nextflow"],
        [name:"Paolo", employer:"Seqera", loves:"Nextflow"],
        [name:"Simon", employer:"NeoGenomics", loves:"Nextflow"]
    )
    | DoSomething
    | view
}

process DoSomething {
    cache { person.subMap('person', 'employer') }
    input: val(person)
    output: tuple val(person), path("${person.name}.txt")
    script:
    """
    echo "Hello ${person.name} from ${person.employer}" > ${person.name}.txt
    """
}

That said, the longer I think about it, I actually prefer the idea that a task's cache inputs should describe everything that you need to reconstruct the outputs.

SPPearce commented 2 weeks ago

Exactly Rob. And it is easy to screw up the joins, because you need to ensure that you have exactly the right fields to map on (which is another bugbear of mine; it'd be really nice to have a join that will match [name:"Ben", employer:"Seqera", loves:"Nextflow"] with [name:"Ben", employer:"Seqera"]). Even better would be if the cache automatically detected only the fields of person that it actually used, but I realise that is a harder task.