grailbio / reflow

A language and runtime for distributed, incremental data processing in the cloud
Apache License 2.0
966 stars 52 forks source link

Better to have multiple smaller Docker images or one larger Docker image? #138

Open niemasd opened 3 years ago

niemasd commented 3 years ago

I am creating a workflow that has many steps that use different tools, and I'm creating minimal Docker images for each of the tools. I was wondering which of the following approaches would be better for Reflow's scalability/performance/etc.:

Any guidance would be greatly appreciated (and ideally any information as to why exactly one may be better than the other, so I can get a better understanding of how Reflow works)

EDIT: And note that, for a single sample, each individual step of the workflow is actually quite fast (e.g. minutes of runtime). The scalability issue we're facing is that we have thousands of samples being run in parallel (so we have a lot of small pipeline step executions)

swami-m commented 3 years ago

I am creating a workflow that has many steps that use different tools, and I'm creating minimal Docker images for each of the tools. I was wondering which of the following approaches would be better for Reflow's scalability/performance/etc.:

  • Have each step of the pipeline use a different small Docker image that contains only the tools used for that specific step
  • Have all steps of the pipeline use a single larger Docker image that contains all of the tools used in the entire pipeline

I don't think this is as much of a big deal unless the docker images become really large if you include all the tools. Even then, it shouldn't be a big deal. (Docker images are pulled on each EC2 instance that's brought up)

Any guidance would be greatly appreciated (and ideally any information as to why exactly one may be better than the other, so I can get a better understanding of how Reflow works)

The more interesting point to consider is how you organize your execs. Typically, you don't want to do too much in an exec (like embedding your entire pipeline in a single exec), but you don't want to do too little either. It would make sense to organize your execs so that they do a single logical thing.

If you have an exec which produces a large amount of output and if you have another exec which processes it, then it would be worthwhile to merge them into the same exec. Also, if there are multiple execs that are processing the same input, it might make sense to do that in one step. For example, lets say you are doing ML and generating a bunch of features for each input, and if you have to process all the input to generate each feature, then instead of having each feature be computed in its own exec, it would make sense to do all of them together in one exec (or combine as many of them as it makes sense).

EDIT: And note that, for a single sample, each individual step of the workflow is actually quite fast (e.g. minutes of runtime). The scalability issue we're facing is that we have thousands of samples being run in parallel (so we have a lot of small pipeline step executions)

We could perhaps provide better guidance if you share a simplification of what your pipeline looks like. (eg, share a sample reflow program after stripping out potentially sensitive steps)

niemasd commented 3 years ago

The more interesting point to consider is how you organize your execs. Typically, you don't want to do too much in an exec (like embedding your entire pipeline in a single exec), but you don't want to do too little either. It would make sense to organize your execs so that they do a single logical thing.

This is super helpful; thanks!! This is a viral amplicon sequencing analysis pipeline, so the files are quite small per sample (we just have a ton of samples we want to process simultaneously). Here's an example of how the pipeline used to look for one sample:

// Run ID: n1.r01.s1
// Created using ViReflow 1.0.8
// ViReflow command: /home/niema/ViReflow/ViReflow.py -rf https://raw.githubusercontent.com/niemasd/ViReflow/main/demo/NC_045512.2.fas -rg https://raw.githubusercontent.com/niemasd/ViReflow/main/demo/NC_045512.2.gff3 -p https://raw.githubusercontent.com/niemasd/ViReflow/main/demo/sarscov2_v2_primers_swift.bed -d s3://niema-test/n1/r01/ -mt 1 -id n1.r01.s1 -o r01/n1.r01.s1.rf s3://niema-test/n1/r01/n1.r01.s1_R1.fastq s3://niema-test/n1/r01/n1.r01.s1_R2.fastq
@requires(disk := 3*GiB)
val Main = {
    files := make("$/files")

    // Use the following FASTQ s3 path(s)
    fq1 := file("s3://niema-test/n1/r01/n1.r01.s1_R1.fastq")
    fq2 := file("s3://niema-test/n1/r01/n1.r01.s1_R2.fastq")

    // Use reference FASTA file: https://raw.githubusercontent.com/niemasd/ViReflow/main/demo/NC_045512.2.fas
    ref_fas := exec(image := "niemasd/bash:5.1.0", mem := 50*MiB, cpu := 1) (out file) {"
        wget -O "{{out}}" "https://raw.githubusercontent.com/niemasd/ViReflow/main/demo/NC_045512.2.fas" 1>&2
    "}
    cp_ref_fas := files.Copy(ref_fas, "s3://niema-test/n1/r01/n1.r01.s1.reference.fas")

    // Use reference GFF3 file: https://raw.githubusercontent.com/niemasd/ViReflow/main/demo/NC_045512.2.gff3
    ref_gff := exec(image := "niemasd/bash:5.1.0", mem := 50*MiB, cpu := 1) (out file) {"
        wget -O "{{out}}" "https://raw.githubusercontent.com/niemasd/ViReflow/main/demo/NC_045512.2.gff3" 1>&2
    "}
    cp_ref_gff := files.Copy(ref_gff, "s3://niema-test/n1/r01/n1.r01.s1.reference.gff")

    // Use primer BED file: https://raw.githubusercontent.com/niemasd/ViReflow/main/demo/sarscov2_v2_primers_swift.bed
    primer_bed := exec(image := "niemasd/bash:5.1.0", mem := 50*MiB, cpu := 1) (out file) {"
        wget -O "{{out}}" "https://raw.githubusercontent.com/niemasd/ViReflow/main/demo/sarscov2_v2_primers_swift.bed" 1>&2
    "}
    cp_primer_bed := files.Copy(primer_bed, "s3://niema-test/n1/r01/n1.r01.s1.primers.bed")

    // Map reads
    untrimmed_bam := exec(image := "niemasd/minimap2_samtools:2.20_1.12", mem := 128*MiB, cpu := 1) (out file) {"
        minimap2 -t 1 -a -x sr "{{ref_fas}}" "{{fq1}}" "{{fq2}}" | samtools view -bS - > "{{out}}"
    "}
    cp_untrimmed_bam := files.Copy(untrimmed_bam, "s3://niema-test/n1/r01/n1.r01.s1.untrimmed.bam")

    // Sort mapped reads
    sorted_untrimmed_bam := exec(image := "niemasd/samtools:1.12", mem := 256*MiB, cpu := 1) (out file) {"
        samtools sort --threads 1 -O bam -o "{{out}}" "{{untrimmed_bam}}" 1>&2
    "}
    cp_sorted_untrimmed_bam := files.Copy(sorted_untrimmed_bam, "s3://niema-test/n1/r01/n1.r01.s1.untrimmed.sorted.bam")

    // Trim reads using iVar
    trimmed_bam := exec(image := "niemasd/ivar:1.3.1", mem := 50*MiB, cpu := 1) (out file) {"
        ivar trim -x 5 -e -i "{{sorted_untrimmed_bam}}" -b "{{primer_bed}}" -p trimmed 1>&2 && mv trimmed.bam "{{out}}"
    "}

    // Sort trimmed BAM
    sorted_trimmed_bam := exec(image := "niemasd/samtools:1.12", mem := 256*MiB, cpu := 1) (out file) {"
        samtools sort --threads 1 -O bam -o "{{out}}" "{{trimmed_bam}}" 1>&2
    "}
    cp_sorted_trimmed_bam := files.Copy(sorted_trimmed_bam, "s3://niema-test/n1/r01/n1.r01.s1.sorted.trimmed.bam")

    // Generate pile-up from sorted trimmed BAM
    pileup := exec(image := "niemasd/samtools:1.12", mem := 50*MiB, cpu := 1) (out file) {"
        samtools mpileup -A -aa -d 0 -Q 0 --reference "{{ref_fas}}" "{{sorted_trimmed_bam}}" > "{{out}}"
    "}
    cp_pileup := files.Copy(pileup, "s3://niema-test/n1/r01/n1.r01.s1.pileup.txt")

    // Call variants
    variants := exec(image := "niemasd/lofreq:2.1.5", mem := 128*MiB, cpu := 1) (out file) {"
        lofreq call -f "{{ref_fas}}" --call-indels "{{sorted_trimmed_bam}}" > "{{out}}"
    "}
    cp_variants := files.Copy(variants, "s3://niema-test/n1/r01/n1.r01.s1.variants.vcf")

    // Call depth from trimmed BAM
    depth := exec(image := "niemasd/samtools:1.12", mem := 50*MiB, cpu := 1) (out file) {"
        samtools depth -d 0 -Q 0 -q 0 -aa "{{sorted_trimmed_bam}}" > "{{out}}"
    "}
    cp_depth := files.Copy(depth, "s3://niema-test/n1/r01/n1.r01.s1.depth.txt")

    // Find low-depth regions
    low_depth := exec(image := "niemasd/low_depth_regions:1.0.0", mem := 20*MiB, cpu := 1) (out file) {"
        low_depth_regions "{{depth}}" "{{out}}" 10 1>&2
    "}
    cp_low_depth := files.Copy(low_depth, "s3://niema-test/n1/r01/n1.r01.s1.lowdepth.tsv")

    // Generate consensus from variants
    consensus := exec(image := "niemasd/bcftools:1.12_1.0", mem := 20*MiB, cpu := 1) (out file) {"
        alt_vars.py -i "{{variants}}" -o tmp.vcf -v lofreq
        bgzip tmp.vcf
        bcftools index tmp.vcf.gz
        cat "{{ref_fas}}" | bcftools consensus -m "{{low_depth}}" tmp.vcf.gz > "{{out}}"
    "}
    cp_consensus := files.Copy(consensus, "s3://niema-test/n1/r01/n1.r01.s1.consensus.fas")

    // Finish workflow
    (cp_untrimmed_bam, cp_sorted_untrimmed_bam, cp_sorted_trimmed_bam, cp_pileup, cp_variants, cp_depth, cp_low_depth, cp_consensus, cp_ref_fas, cp_ref_gff, cp_primer_bed)
}

This took ~8 minutes to run on 1, 10, and 100 simultaneous samples, but when I hit 1K samples, it got bogged down and took like 1.5 hours. I've now merged everything into a single exec and tar.gz the output files before copying, and it runs ~twice as fast on a single sample (~4.5 minutes):

// Run ID: n1.r01.s1
// Created using ViReflow 1.0.9
// ViReflow command: /home/niema/ViReflow/ViReflow.py -rf https://raw.githubusercontent.com/niemasd/ViReflow/main/demo/NC_045512.2.fas -rg https://raw.githubusercontent.com/niemasd/ViReflow/main/demo/NC_045512.2.gff3 -p https://raw.githubusercontent.com/niemasd/ViReflow/main/demo/sarscov2_v2_primers_swift.bed -d s3://niema-test/n1/r01/ -id n1.r01.s1 -o n1/r01/n1.r01.s1.rf s3://niema-test/n1/r01/n1.r01.s1_R1.fastq s3://niema-test/n1/r01/n1.r01.s1_R2.fastq
@requires(disk := 3*GiB)
val Main = {
    files := make("$/files")

    // Using the following S3 input files
    fq1 := file("s3://niema-test/n1/r01/n1.r01.s1_R1.fastq")
    fq2 := file("s3://niema-test/n1/r01/n1.r01.s1_R2.fastq")

    // Main ViReflow pipeline execution
    out_file := exec(image := "niemasd/vireflow:latest", mem := 1*GiB, cpu := 1) (out file) {"
        # Copy input files locally
        mkdir "n1.r01.s1_output"
        wget -O "n1.r01.s1_output/n1.r01.s1.reference.fas" "https://raw.githubusercontent.com/niemasd/ViReflow/main/demo/NC_045512.2.fas"
        wget -O "n1.r01.s1_output/n1.r01.s1.reference.gff" "https://raw.githubusercontent.com/niemasd/ViReflow/main/demo/NC_045512.2.gff3"
        wget -O "n1.r01.s1_output/n1.r01.s1.primers.bed" "https://raw.githubusercontent.com/niemasd/ViReflow/main/demo/sarscov2_v2_primers_swift.bed"
        cp "{{fq1}}" "n1.r01.s1_output/n1.r01.s1.fq1.fastq"
        cp "{{fq2}}" "n1.r01.s1_output/n1.r01.s1.fq2.fastq"

        # Map reads using minimap2
        minimap2 -t 1 -a -x sr "n1.r01.s1_output/n1.r01.s1.reference.fas" "n1.r01.s1_output/n1.r01.s1.fq1.fastq" "n1.r01.s1_output/n1.r01.s1.fq2.fastq" | samtools view -bS - > "n1.r01.s1_output/n1.r01.s1.untrimmed.bam"

        # Sort mapped reads
        samtools sort --threads 1 -O bam -o "n1.r01.s1_output/n1.r01.s1.untrimmed.sorted.bam" "n1.r01.s1_output/n1.r01.s1.untrimmed.bam" 1>&2

        # Trim mapped reads using ivar
        ivar trim -x 5 -e -i "n1.r01.s1_output/n1.r01.s1.untrimmed.sorted.bam" -b "n1.r01.s1_output/n1.r01.s1.primers.bed" -p "n1.r01.s1_output/n1.r01.s1.trimmed" 1>&2

        # Sort trimmed mapped reads
        samtools sort --threads 1 -O bam -o "n1.r01.s1_output/n1.r01.s1.trimmed.sorted.bam" "n1.r01.s1_output/n1.r01.s1.trimmed.bam" 1>&2

        # Generate pile-up from sorted trimmed BAM
        samtools mpileup -A -aa -d 0 -Q 0 --reference "n1.r01.s1_output/n1.r01.s1.reference.fas" "n1.r01.s1_output/n1.r01.s1.trimmed.sorted.bam" > "n1.r01.s1_output/n1.r01.s1.pileup.txt"

        # Call variants using lofreq"
        lofreq call -f "n1.r01.s1_output/n1.r01.s1.reference.fas" --call-indels "n1.r01.s1_output/n1.r01.s1.trimmed.sorted.bam" > "n1.r01.s1_output/n1.r01.s1.variants.vcf"

        # Call depth from trimmed BAM
        samtools depth -d 0 -Q 0 -q 0 -aa "n1.r01.s1_output/n1.r01.s1.trimmed.sorted.bam" > "n1.r01.s1_output/n1.r01.s1.depth.txt"

        # Find low-depth regions
        low_depth_regions "n1.r01.s1_output/n1.r01.s1.depth.txt" "n1.r01.s1_output/n1.r01.s1.low_depth.tsv" 10 1>&2

        # Generate consensus sequence
        alt_vars.py -i "n1.r01.s1_output/n1.r01.s1.variants.vcf" -o tmp.vcf -v lofreq
        bgzip tmp.vcf
        bcftools index tmp.vcf.gz
        cat "n1.r01.s1_output/n1.r01.s1.reference.fas" | bcftools consensus -m "n1.r01.s1_output/n1.r01.s1.low_depth.tsv" tmp.vcf.gz > "n1.r01.s1_output/n1.r01.s1.consensus.fas"

        # Remove redundant output files before compressing
        rm */*trimmed.bam

        # Compress output files
        tar cvf - "n1.r01.s1_output" | pigz -1 -p 1 > "{{out}}"
    "}
    cp_out_file := files.Copy(out_file, "s3://niema-test/n1/r01/n1.r01.s1.tar.gz")
    (cp_out_file)
}

I'm going to try running larger numbers of samples to see how things scale

swami-m commented 3 years ago

A random list of suggestions (note, uncompiled code, so you might have to fix syntax errors, if any):

samples = [("n1", "r01"), ("n2", "r02") ... ] val Main = [process(basepath, n, r) | (n, r) <- samples)]

The above will produce a directory "output" with all the output files under each sample's base path.
Most importantly, if you have caching enabled, you will benefit from incremental processing.

If you feel that you want to recompute everything anyway (say you made a mistake and want to regenerate for old samples as well), you can run reflow with `-cache=write`.

- Within your exec, avoid writing to a file whenever possible.  I see that you do that in one step. but the following:

Sort trimmed mapped reads

samtools sort --threads 1 -O bam -o "n1.r01.s1_output/n1.r01.s1.trimmed.sorted.bam" "n1.r01.s1_output/n1.r01.s1.trimmed.bam" 1>&2

Generate pile-up from sorted trimmed BAM

samtools mpileup -A -aa -d 0 -Q 0 --reference "n1.r01.s1_output/n1.r01.s1.reference.fas" "n1.r01.s1_output/n1.r01.s1.trimmed.sorted.bam" > "n1.r01.s1_output/n1.r01.s1.pileup.txt"

can be written as:

Sort trimmed mapped reads

samtools sort --threads 1 -O bam -o "-" "n1.r01.s1_output/n1.r01.s1.trimmed.bam" | samtools mpileup -A -aa -d 0 -Q 0 --reference "n1.r01.s1_output/n1.r01.s1.reference.fas" "-" > "n1.r01.s1_output/n1.r01.s1.pileup.txt"


This avoids having to delete the intermediate "trimmed.bam" files.
swami-m commented 3 years ago

Also, a general comment. One of the biggest advantages of using reflow is its caching layer and the ability to do incremental processing. You expose what you want to cache by having execs return the respective files or dirs. (provided they aren't too large).

If you haven't already done so, you can enable caching (see under "Next, we'll set up a cache.")

niemasd commented 3 years ago

Thank you, this was incredibly helpful! I do indeed have caching enabled, but I think the primary issue with the workflows I was running that caused the slowdown with separate execs is that the files themselves are incredibly tiny: these are viral amplicon sequencing datasets, so even on the largest samples, I had ~500 MB in 2 FASTQ files, ~200 MB in two BAM files (trimmed and untrimmed), and then a bunch of tiny (e.g. < 100 KB) files, including the reference genome, which is only 30 KB

Regarding avoiding writing files, I am considering adding optional flags to avoid writing intermediate files in the future, but some users (including us) want to hold onto these intermediate files for debugging/investigative purposes. Thanks for the tip, though! Once we have things stabilized, I'll start looking for places where I can pipe to avoid writing to disk

EDIT: Ah, so the deletion at the end is for the unsorted trimmed BAMs, but the issue is that the tool I'm using writes to disk and doesn't (seem to) have an option for outputting to stdout instead (to then pipe into samtools sort). I could get around this using named pipes, though, so perhaps I'll consider doing that instead in the future 😄