grailbio / reflow

A language and runtime for distributed, incremental data processing in the cloud
Apache License 2.0
964 stars 52 forks source link

How to properly write tests? #89

Closed olgabot closed 5 years ago

olgabot commented 5 years ago

Hello, I have a file called star_htseq.rf that has a Main function that I can run from the command line. What is the proper way to test this workflow? Previously I had been writing a bunch of makefiles but it seems like there should be a way to do this within Reflow itself. Thank you! Warmest, Olga

read1 := "s3://czbiohub-maca/remux_data/170907_A00111_0051_BH2HWLDMXX/rawdata/N8-MAA000612-3_9_M-1-1_S36/N8-MAA000612-3_9_M-1-1_S36_R1_001.fastq.gz"
read2 := "s3://czbiohub-maca/remux_data/170907_A00111_0051_BH2HWLDMXX/rawdata/N8-MAA000612-3_9_M-1-1_S36/N8-MAA000612-3_9_M-1-1_S36_R2_001.fastq.gz"
name := "N8-MAA000612-3_9_M-1-1_S36"
output := "s3://olgabot-maca/star_htseq_test/"
genome := "mm10-plus"

star_htseq := make("./star_htseq.rf",
    read1, read2, name, output, genome)

val Main = 
    star_htseq.Main

The star_htseq.rf file in question:

```golang param ( // S3 path to read1 of the fastq/fasta file. If multiple files, // can be pipe-separated e.g. sample1_01.fastq|sample1_02.fastq read1 string // S3 path to read2 of the fastq/fasta file. If multiple files, // can be pipe-separated e.g. sample1_01.fastq|sample1_02.fastq read2 string // Identifier of the sample name string // Full s3 folder location to copy alignment output and htseq // results. Each sample is output to a folder. output string // Either "hg38-plus" or "mm10-plus". // Must be a .tgz file containing a folder with a single .fa // and .gtf file in s3://czbiohub-reference genome = "hg38-plus" ) val files = make("$/files") val dirs = make("$/dirs") // Local utility file val util = make("./util.rf") ubuntu := "ubuntu:16.04" star := "quay.io/biocontainers/star:2.6.0c--0" samtools := "biocontainers/samtools" htseq := "quay.io/biocontainers/htseq:0.9.1--py36_0" func DecompressReference(reference file) = exec(image := ubuntu, mem := 32*GiB) (fasta file, gtf file) {" mkdir /tmp/reference tar -C /tmp/reference -xzvf {{reference}} pwd ls -lha /tmp/reference/ cp /tmp/reference/*/*fa {{fasta}} cp /tmp/reference/*/*gtf {{gtf}} "} func starIndex(fasta, gtf file) (out dir) = exec(image := star, mem := 32*GiB) (out dir) {" STAR \ --runThreadN 12 \ --runMode genomeGenerate \ --genomeDir {{out}} \ --genomeFastaFiles {{fasta}} \ --sjdbGTFfile {{gtf}} "} func starAlign(read1, read2 [file], genomeDir dir) = exec(image := star, mem := 12*GiB, cpu := 12) (out dir) {" cd {{out}} STAR --outFilterType BySJout \ --outFilterMultimapNmax 20 \ --alignSJoverhangMin 8 \ --alignSJDBoverhangMin 1 \ --outFilterMismatchNmax 999 \ --outFilterMismatchNoverLmax 0.04 \ --alignIntronMin 20 \ --alignIntronMax 1000000 \ --alignMatesGapMax 1000000 \ --outSAMstrandField intronMotif \ --outSAMtype BAM Unsorted \ --outSAMattributes NH HI NM MD \ --outReadsUnmapped Fastx \ --readFilesCommand zcat \ --runThreadN 12 \ --genomeDir {{genomeDir}} \ --readFilesIn {{read1}} {{read2}} "} func samSort(aligned file) = { mem := 10 * len(aligned) command_memory := 9 * len(aligned) exec (image := samtools, mem) (sorted file) {" samtools sort -m {{command_memory}} -o {{sorted}} {{aligned}} "} } func samIndex(sorted file) = exec(image := samtools, mem := GiB) (index file) {" samtools index -b {{sorted}} {{index}} "} func samSortByName(sorted file) = { mem := 10 * len(sorted) command_memory := 9 * len(sorted) exec(image := samtools, mem) (byname file) {" samtools sort -m {{command_memory}} -n -o {{byname}} {{sorted}} "} } func htseqCount(byName, gtf file) = exec(image := htseq, mem := GiB) (counts file) {" htseq-count -r name -s no -f bam -m intersection-nonempty {{byName}} {{gtf}} > {{counts}} "} val Main = { output := util.MaybeAddTrailingSlash(output) r1 := util.SplitByPipe(read1) r2 := util.SplitByPipe(read2) reference := file("s3://czbiohub-reference/" + genome + ".tgz") val (fasta, gtf) = DecompressReference(reference) val index = starIndex(fasta, gtf) val alignment_dir = starAlign(r1, r2, index) val (aligned, _) = dirs.Pick(alignment_dir, "*Aligned.out.bam") val sorted = samSort(aligned) val index = samIndex(sorted) val byName = samSortByName(sorted) val counts = htseqCount(byName, gtf) count_dir := dirs.Make(["htseq-count.txt": counts]) output_dirs := [alignment_dir, count_dir] [dirs.Copy(d, output) | d <- output_dirs] } ```
mariusae commented 5 years ago

We usually test modules directly within Reflow as you suggest. Reflow comes with a test package:

$ reflow doc $/test
Declarations

val Assert func(tests [bool]) unit
    Assert fails if any passed (boolean) value is false.

val All func(tests [bool]) bool
    All returns true if every passed (boolean) value is true.

This allows you to write tests (that return boolean values indicating success), and then piece them together, e.g.,:

...

// Verify that filter conditions are working.
val TestPhixCounts = {
        val phixBam = file("./../testdata/sambamba/phix.bam")
        val phixBai = file("./../testdata/sambamba/phix.bam.bai")
        val phixCounts = sambamba.PhixCounts(phixBam, phixBai)
        phixCounts == fileutils.Echo("1\n")
}

val Main = test.Assert([
        TestIndex,
        TestMerge,
        TestMergeAndIndex,
        TestSortedMergeAndIndex,
        TestMarkdup,
        TestFilterDuplicates,
        TestSort,
        TestNameSort,
        TestMpileup,
        TestPhixCounts,
        TestReadExtraction,
        TestExtractDiscordantReads,
        TestRemoveDiscordantReads,
])

I am planning on adding a proper test runner, so that you can do reflow test foo.rf, and it will check each exported identifier with the prefix Test...

olgabot commented 5 years ago

Thank you! That is very helpful. Can you let me know how to import the fileutils module?

mariusae commented 5 years ago

ah, fileutils is an internal module we have. here it is:

// Module with generic file utilities

val image = "ubuntu:16.04"
val cpu = 1
val mem = GiB

// Gzip gzips a file
//
// @param f File to be gzipped
// @return gzipped Gzipped version of the input file f
func Gzip(f file) =
    exec(image, cpu, mem) (gzipped file) {"
        gzip -c {{f}} > {{gzipped}}
    "}

// Gunzip gzips a file
//
// @param gz File to be unzipped
// @return f Unzipped file
func Gunzip(gz file) =
    exec(image, cpu, mem) (f file) {"
        gunzip -c {{gz}} > {{f}}
    "}

// Echo creates a file with a given string
//
// @param towrite String to write to file
// @return output Newly created file
func Echo(towrite string) =
    exec(image, cpu, mem) (output file) {"
        printf "{{towrite}}" > {{output}}
    "}

// EmptyFile is a file of size 0.
val EmptyFile = file("/dev/null")

// Concat concatenates text files together using cat.
func Concat(files [file]) =
    if (len(files) == 1) {
        val [returnFile] = files
        returnFile
    } else {
        exec(image, cpu, mem) (output file) {"
            cat {{files}} > {{output}}
        "}
    }

// Header extracts the first line of the file into a new file
func Header(f file) =
    exec(image := image, cpu, mem) (result file) {"
        head -n1 {{f}} > {{result}}
    "}

// HeaderLess extracts all but the first line of the file into a new file
func HeaderLess(f file) =
    exec(image := image, cpu, mem) (result file) {"
        tail -n+2 {{f}} > {{result}}
    "}
olgabot commented 5 years ago

Got it, thank you!

mariusae commented 5 years ago

Coming soon: reflow test

screen shot 2018-11-30 at 4 35 27 pm