nextflow-io / nextflow

A DSL for data-driven computational pipelines
http://nextflow.io
Apache License 2.0
2.77k stars 630 forks source link

Finalize workflow output definition #5103

Closed bentsherman closed 3 weeks ago

bentsherman commented 4 months ago

Continuation of #4670

Enumerating the proposed changes that we've collected so far:

robsyme commented 4 months ago

Great suggestions Ben. Do you expect that there will be any changes to the observable events in this new channel-focused model?

A common use-case for plugins is to register the outputs with a third-party service (LIMS, for example). It would be helpful if plugins could hook into a publish event that has both the source, destination, and other channel objects (i.e. meta) in a single event. So that the outputs could be appropriately tagged/saved.

If we are now "publishing channels", it would be helpful to have a way to include everything, including non-file data in that publishing event.

pinin4fjords commented 4 months ago

Great to see this coming together after previous discussions Ben!

Really like the output-dir. It always felt odd to me that work-dir was something native while the workflow builders had to deal with the output directory themselves.

The distinction between 'output' and 'publishing' never sat well with me, it made much more sense to me if everything was an 'output', with things marked temporary/ retained as appropriate. Plus it is a nightmare searching across multiple config files for the publishing logic for a specific set of files (see the current dev branch of nf-core/rnaseq). So I really like all the places where you're removing publishing logic and it makes a LOT of sense to me that the publishing/ retaining part happens only at the outermost layer- I hope you keep that.

Really like having entry workflows as well, having all components be more easily runnable without as much surrounding boilerplate would be awesome.

Would also be interested to hear more on the non-file output aspects, speaking to Rob's point, and how it could assist with daisy chaining workflows, but overall love the way this is going!

pditommaso commented 4 months ago

Points I'd like to discuss:

bentsherman commented 4 months ago

From @Midnighter in #5130 :

In the last podcast episode, you were debating a bit, whether allowing to define output publishing from a module/process-level.

My clear opinion is that this is a bad idea:

  • It's a mix of concerns. It should not be a process' (function's) concern where its output is stored. You are giving it too much responsibility.
  • Additionally, allowing publishing to only be set at the workflow level ensures the modularity of processes is optimal. The same actually applies to sub-workflows in my mind. I would only consider the highest-level workflow publishing instructions and ignore publishing set in sub-workflows. That way, modules/sub-workflows can easily be used in different pipelines without requiring overrides.
  • In my view, processes should be as close as possible to pure functions, such that you have a reproducible output when given a deterministic environment (container hash), same input (hash of data), and same operations (hash of code/git commit). Changing publishing behavior requires code changes, even though the operation itself is not changing.
  • Flexibility in storage backends: If a module assumes certain output path capabilities that are not supported by my storage solution, I have to redefine all those publishing options.
bentsherman commented 4 months ago

Thanks @Midnighter, I have basically reached the same conclusions. After discussing with Paolo, I think we will encourage publish: to be used only in the entry workflow, but still allow it in sub-workflows as a convenience, and not allow it in processes.

FriederikeHanssen commented 3 months ago

Hey! If I can add something from my wishlist:

it would be great to get logging on what dynamic output paths like "foo/${meta.id}" are resolved to. Either just for WARN/Errors (i.e.publishDir path is null ) and have it show up somewhere in the error logs. Or if it doesn't get to verbose in the regular nextflow log.

nvnieuwk commented 2 months ago

Any news on when the dynamic path mapping will be in a nextflow edge release? I'm currently trying to convert my file publishing to the new output definitions but need this feature to be able to copy the old way

bentsherman commented 2 months ago

@nvnieuwk Still in progress. I have an implementation in the linked PR but it needs to be reviewed and tested. You're welcome to try it out if you want. I doubt it will make the next edge release, maybe 24.09.0-edge.

bentsherman commented 2 months ago

I'd like to get some opinions on some of the nuances of the dynamic path mapping. If I have something like this:

workflow {
  main:
  ch_fastq = Channel.of( [ [id: 1], [ file('foo.fastq') ] ] )

  publish:
  ch_fastq >> 'fastq'
}

output {
  directory 'results'

  'fastq' {
    path { /* ... */ }
  }
}

I see a few different ways to design the path closure:

  1. The path closure defines the entire file path relative to the output directory:

    // val == [ meta, fastqs ]
    path { file, val -> "fastq/${val[0].id}/${file.name}" }

    The final published path would be results/fastq/${meta.id}/${file.name}. Since a channel value can contain multiple files, the path closure is invoked for each individual file, passing both the file and the containing channel value.

  2. The path closure only defines the subdirectory:

    path { val -> "fastq/${val[0].id}" }
    // could also do { meta, fastqs -> "fastq/${meta.id}" }

    The final published path is the same as (1), but now the file name is implied and can't be customized in the path closure. All files in a channel value are published to the same subdirectory.

I like (2) because it's simpler and retains the meaning of the path setting, but it's also not as flexible as (1). If by "dynamic paths" we only mean things like "organize samples into subdirectories by id", then (2) is all we need. But if people also want to be able to change the base name of each file (which is supported by saveAs), then (2) is not flexible enough.

Option (1) is essentially the same as saveAs but with the context explicitly given as a second parameter (the channel value), rather than the task inputs being implicitly available.

Let me know what you think. This is the essential question we need to resolve in order to move this feature forward.

nvnieuwk commented 2 months ago

I personally like option 1 because that way I don't have to make sure the files emitted by processes have the correct name as I will set the correct output name here. It's also a nice option to be able to specify some kind of optional nesting which option 1 would enable.

nvnieuwk commented 2 months ago

During my testing I also noticed a weird error. So when an input file that hasn't been staged is present in the channel specified in the publish section, the next error happens:

  ERROR ~ assert path
         |
         null

It started working again if I filtered out these paths

bentsherman commented 2 months ago

Two more syntax ideas from @robsyme and @pinin4fjords respectively

  1. Closure within a closure. The outer closure is over the channel value and returns a closure that corresponds exactly to the saveAs closure:

    path { meta, fastqs ->
     { file -> "${meta.id}/${file.baseName}" }
    }

    A bit more verbose but basically just a copy-and-paste from the old way, might be easier for people to move to.

  2. Closure that only takes two arguments, a metadata map and a single file:

    path { meta, fastq -> "${meta.id}/${fastq.baseName}" }

    Keeps the closure simple. Requires the user to "prepare" output channels in the workflow logic for publishing by making sure they adhere to this structure, e.g.:

    workflow {
     ch_fastq = Channel.of( [ [:], [ file('1.fastq'), file('2.fastq') ] ] )
    
     publish:
     ch_fastq.transpose() >> 'fastq'
    }

    So probably just a lot of extra transposes. Fits well with the index file model of file + metadata. The big question is whether this pattern is truly generic enough to cover all use cases or if we are optimizing too much around nf-core.

pinin4fjords commented 2 months ago

Thanks @bentsherman!

I like 4) because, while it forces me to to a transpose from time to time, I know that if I make my channels fit a predefined schema, I'm good. I guess we'd probably just end up doing a lot of:

  publish:
  ch_fastq.tranpose() >> 'fastq'

?

(Although, if it's a case of doing that all the time, maybe there could be something implicit somewhere to do that?)

In any case, predictable structure of publishable channels means that all my output blocks are going to look very similar which makes them easier to maintain relative to a custom closure every time. Maybe the nf-core meta pragma isn't what we need, I'd be happy with any application of metadata to files.

To take this to the nth degree, if things were predictable enough, maybe there could be some default behaviour so we didn't even need

  'fastq' {
    path { /* ... */ }
  }

... most of the time. There could be a default way in which files were structured in the publish directories based on metadata properties. You'd only need the closure if you wanted to depart from those defaults.

robsyme commented 2 months ago

What if, for people that wanted an index file, the pattern 3 could be extended to include something close to what @pinin4fjords is suggesting. The user could return two elements, where the first element is the metadata and the second element is the destination filename:

path { meta, fastqs ->
  { file -> 
    destination = "${meta.id}/${file.baseName}" 
    [meta.subMap('id', 'patient'), destination]
  }
}

The index could simply use the first element (if provided) to populate the columns in the csv/tsv/whatever.

This would remove the necessity for the channel transforms in the workflow. If the transforms are always going to be simple unwrapping/transposing, I think this approach would be tidier. If we expect transforms to be more involved, then this proposal would not be suitable.

bentsherman commented 2 months ago

The samplesheet produced by fetchngs contains a row for each sample, where each sample has potentially four files (fastq pair + md5 pair) associated with it.

This example suggests that the publishing is file-centric whereas the index file is sample-centric, so we can't necessarily couple these things as a shortcut. For the path option I need to be able to define the publish mapping for each individual file, but for the index file I don't necessarily want each file on its own line.

In order words, the published channel needs to correspond exactly to the rows of the index file, basically option (3).

I think the appeal of option (4) is to not have so much code in the output block, and to make the path closure uniform, instead of it depending on the published channel which is in a completely different part of the code. And to make space for more aggressive shortcuts.

Need to see if there is a way to recover those properties with option (3). But I suspect the "shortcut" is to not use dynamic paths at all.

workflow {
  main:
  ch_fastq = Channel.of( [ [:], file('1.fastq'), file('2.fastq') ] )

  publish:
  ch_fastq >> 'fastq'
}

output {
  fastq {
    // default: publish everything to 'fastq/'
    // no path option needed

    // dynamic path (if you want it)
    path { meta, fastq_1, fastq_2 ->
      { file -> "fastq/${meta.id}/${file.baseName}" }
    }

    // should just work
    index {
      path 'samplesheet.csv'
    }
  }
}

I think the main shortcut I was imagining is that the index file should just work with the conventional meta + file(s) structure as shown above, without any extra closure logic. Bit tricky to infer the column names for the files but still doable. Would become much simpler with record types.

But the path closure seems unavoidable if you want a dynamic path, because there is no guaranteed 1-to-1 mapping of channel value to file.

bentsherman commented 1 month ago

Another nice thing I realized about the double closure is that it could also support option (2) above, by returning a path instead of an inner closure. That would correspond to publishing each file from the channel into a directory without modifying the base file name:

    path { meta, fastq_1, fastq_2 ->
      "fastq/${meta.id}"
    }

So there can be multiple levels of dynamism depending on how specific you want to be.

pditommaso commented 1 month ago

The nested closure is very confusing. It's hard to understand when using the more than one argument or a single one. I think the problem could be reduced to having a closure with arguments, one for the context and a second for the file path. The context could be by definition the first one.

bentsherman commented 1 month ago

Keep in mind that the double closure is the most extreme form of dynamic path and the least likely to be used. Doing something like { file, context -> ... } would make it simpler but then make the other more common usages more complicated / less readable. So I think we should optimize for the simpler more common usages and let the least common one be ugly:

output {
  // many levels of dynamism
  fastq {
    // default: publish to 'fastq/'
    // nothing

    // publish to a different static path
    path 'samples'

    // publish to subdirectories by sample id
    path { meta, fastq_1, fastq_2 ->
      "fastq/${meta.id}"
    }

    // publish each file to its own path (extreme)
    path { meta, fastq_1, fastq_2 ->
      { file -> "fastq/${meta.id}/${file.baseName}" }
    }
  }
}
bentsherman commented 1 month ago

@robsyme I've been thinking about your point about updating the publish events. Now that I've taken another pass through everything, I see now that it would actually be quite simple to do what you suggested.

We could add a new event e.g. onWorkflowPublish() which is triggered whenever a channel value is published. Just as the index file records each channel value as a row in the CSV, the trace observer should be able to listen for these "published values".

We can keep the existing onFilePublish() event as it is, because the two events are complementary:

So you can listen to either event or both based on what you need.

I was worried at first because I was thinking about trying to attach metadata to each individual file, but I like this separate event much better

robsyme commented 1 month ago

You superstar. Thanks Ben!!! That's exactly the result I was hoping for.

robsyme commented 1 month ago

Thanks @bentsherman. Three quick questions 1) would the onWorkflowPublish() event only have access to the non-file values in the channel? 2) would the values in the same "entity" or "row" in the channel be available together? 3) if the onWorkflowPublish() does not have access to the file/path objects, would it be possible to associate those files with the metadata (and vice versa) ?

bentsherman commented 1 month ago

Yes it should just receive a single argument corresponding to the channel value:

void onWorkflowPublish(Object value) {
  def (meta, fastqs) = [ value[0], value[1] ]
  // ...
}
bentsherman commented 1 month ago

I guess the tricky part here is that this event hook is receiving every value from every published channel, so you'll have to do some pattern matching if you want to dig deeper into things like metadata

That's actually pretty wild from the plugin's perspective... you're receiving arbitrary values that depend entirely on the pipeline you're running. Of course you could optimize around a few common use cases like "if it's a list, and the first element is a map, etc ..."

We might want to think more critically about this interface

robsyme commented 1 month ago

You make an excellent point. The lack of specificity would be challenging.

Given the interface

workflow {
  main:
  ch_foo = Channel.of( [ [id: 1], file('foo.fastq') ] )

  publish:
  ch_foo >> 'foo'
}

It would be helpful if the 'foo' tag was made available to at least provide some labelling of the channels.

void onWorkflowPublish(String channelTag, Object value) {
  // ...
bentsherman commented 1 month ago

I had a similar thought. The target name might be nice to have just for record keeping, but it doesn't reveal anything new about the expected structure of the value

It's not entirely hopeless though. If you look at DumpHelper, it's used by the dump operator to pretty print arbitrary objects as JSON, and it's pretty robust. So you could take a similar approach with this event hook.

A plugin basically has two options:

  1. Document the data structures that you know how to handle (e.g. tuple of meta map and files)
  2. Expose some standard record types with different meanings, which pipeline developers can use at the end of their pipeline (e.g. "if you publish a channel with this record type, this plugin will do X with it")

Both seem kinda sketchy

robsyme commented 1 month ago

The unknowability of the exact structure of the objects in channels doesn't worry me too much. Nextflow (and any plugin) should dutifully serialize the object.

We already impose one important limitation on objects in channels (or at least objects that support -resume anyway) - that is they should be seriazliable by Kryo. A brave suggestion would be to change the serialization requirement that objects should be CBOR-able or JSON-able rather than Kryo serializable, then we could guarantee that objects in channels could be serialized by an plugin and deserialized by downstream, non-Nextflow tools. Kryo's big drawcard is that it is space efficient, but of course we throw away the serialized form after hashing anyway, so I'd argue that space efficiency is moot anyway.

bentsherman commented 1 month ago

I like the idea of requiring it to be serializable into some kind of format. That would be a good thing to explore with your plugin. I'll try to at least get this workflow event merged in the second preview.

Is CBOR not space efficient compared to Kryo?

robsyme commented 1 month ago

CBOR and Kryo will both produce relatively efficient, compressed forms. JSON serialization will take more space than the other two, but space isn't really a concren because: 1) the serialized forms are not being transmitted over the wire in an environment that requires low latency 2) the objects are almost always very small anyway.

Austin-s-h commented 1 month ago

Hi! I don't want to hijack this thread, but I did have a couple questions related to #2844 and #5185 and I'm hoping this is an appropriate place.

With the new output style, I really like the creation of an index file that contains file name/output path information. I see this being helpful in a variety of scenarios. I am specifically interested in generating md5sum hashes of my output files. This could be included in the index as an additional column per-file if enabled via the path directive? I also saw a draft about enabling E-tag aware files for Azure and AWS? I know that E-tags on AWS can vary based on the chunk/part size during upload, so I am concerned about assuming this value is always the valid hash. From what I saw it looked like the hash would be calculated first using a standard implementation, but is it validated that this matches the E-tag?

Currently, with Nextflow 24.04, my plan to achieve this behavior using publishDir is to implement the checksum as part of my process and add it as a storage tag (publishing to S3). I would write another process that would collect the output files and generate an index similar to what the new output style would do. This might work but isn't very elegant, is there a better pattern that exists already?

bentsherman commented 1 month ago

A simple solution would be to define a function that computes the md5 of a file:

// Path -> String
def md5(file) {
  // ...
}

Then you could compute the md5 whenever you want and append it to the channel value:

ch_fastq.map { meta, fastq_1, fastq_2 ->
  [ meta, fastq_1, fastq_2, md5(fastq_1), md5(fastq_2) ]
}

A more efficient solution would be to compute the md5 while you already have the data locally, as it is done here: https://github.com/nf-core/fetchngs/blob/c60d09be3f73376156471d6ed78fcad646736c2f/modules/local/aspera_cli/main.nf#L26-L33

Either way, you have to compute these checksums yourself if you want them to be in the index file, because you are responsible for defining the schema and columns, etc using the mapper directive

bentsherman commented 1 month ago

Note, I think we could provide md5() as a standard library function

Midnighter commented 2 weeks ago

Note, I think we could provide md5() as a standard library function

whispers choose https://www.blake2.net/

robsyme commented 2 weeks ago

More loudly choose BLAKE3