bigdatagenomics / adam

ADAM is a genomics analysis platform with specialized file formats built using Apache Avro, Apache Spark, and Apache Parquet. Apache 2 licensed.
Apache License 2.0
1k stars 308 forks source link

Trying to get Spark pipeline working with slightly out of date code. #1313

Closed TomNash closed 7 years ago

TomNash commented 7 years ago

I'm going off the code posted here: https://github.com/allenday/spark-genome-alignment-demo/blob/master/bin/bowtie_pipe_single.scala

I've made some changes successfully to accommodate changes in the ADAM code, but I'm getting hung up on trying to use the SAMRecordConverter. Currently the code is using the line below but this fails, saying the type is not found.

val samRecordConverter = new SAMRecordConverter

I've got no Scala experience, but I from what I understand the class settings of private[adam] SAMRecordConverter are pretty restrictive. Is there a way I can access and use it?

heuermh commented 7 years ago

We have a new Pipe API (#1114) to support this use case, then you wouldn't need any of the SAM file parsing or conversion stuff at all, rather something like

val reads = ac.loadAlignments(sys.env("DEMO")+"/build/data/reads.fq")

implicit val tFormatter = SAMInFormatter
implicit val uFormatter = new AnySAMOutFormatter

val pipedRdd: AlignmentRecordRDD = reads.pipe("bowtie ...")

Bowtie/bowtie2 is something worth having a proper Pipe API wrapper for, let me take a closer look. I don't remember if it can accept SAM as piped input, for example.

TomNash commented 7 years ago

Sorry to bother again, just wanted to ask on a more concrete scale this time and make sure I understand as this is all new to me.

So the example starts with taking some FASTQ file and the goal is to align it with bowtie and write to SAM format. The way this code was doing it was using some perhaps overly complicated text parsing/formatting within Spark using some methods provided, then saving the file to SAM format.

What you're suggesting is to read in the FASTQ file (what is the reason for using loadAlignments as opposed to loadFastq here?), use the Pipes API to run the bowtie alignment (input here would be the appropriate bowtie command as if it were run over CLI?), then save that output to SAM format (using .saveAsSam(path, ...) I assume?)

So something like this:

val reads = ac.loadAlignments(sys.env("DEMO")+"/build/data/reads.fq")

implicit val tFormatter = SAMInFormatter
implicit val uFormatter = new AnySAMOutFormatter

val pipedRdd: AlignmentRecordRDD = reads.pipe("bowtie <align fastq to genome>")

pipedRdd.saveAsSam(<path to file>, asSingleFile = false, isSorted = false)

This would eliminate a lot of the original code, especially all of the mapping and filtering.

heuermh commented 7 years ago

Yes, that looks about right. Any of the FASTQ-related load methods would work; they all return AlignmentRecordRDD.

TomNash commented 7 years ago

First, thank you for the response. I'm able to run the code using the skeleton above.

Can you comment on the Pipe API or point me in the direction of some documentation? If we were to run reads.pipe("bowtie -S /path/to/genome /path/to/input.fastq"), does it run in any sort of distributed manner?

fnothaft commented 7 years ago

Hi @TomNash! Glad to hear that you were able to get the skeleton working! The pipe API has some inline docs, but they are limited. I've opened #1368 to address this more fully.

In your example, that's pretty close to how it'd work! The pipe API will run that command in a distributed fashion, using ADAM/Spark to parallelize the work across all of the machines and to format the data that is going into the pipe. There are a few small differences from your example:

You can see an example of a similar command in our unit tests. In your case, I think your command would look like:

import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.fragment.InterleavedFASTQInFormatter
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, AnySAMOutFormatter }

implicit val tFormatter = InterleavedFASTQInFormatter
implicit val uFormatter = new AnySAMOutFormatter

val pipedRdd: AlignmentRecordRDD = ardd.pipe("bowtie -S $0",
  files = Seq("/path/to/genome"))

This would assume that bowtie is installed (and on the PATH) on every node in the cluster.

Let me know if this helps, or if you've got any questions!

heuermh commented 7 years ago

As mentioned elsewhere, we would like to build up a repository of commonly used bioinformatics tools wrapped in the Pipe API, a proposal for which is here

https://github.com/heuermh/cannoli

The example above is not quite right. First, the scala compiler isn't able to deduce types from the implicits, so the pipe invocation needs some help. Second, the bowtie command requires - as an argument to read from stdin. Finally, including the indices as separate files won't work, because the bowtie <ebwt> argument specifies the base name instead of individual index files.

This seems to work for me https://github.com/heuermh/cannoli/blob/master/src/main/scala/org/bdgenomics/cannoli/Bowtie.scala#L73

$ brew install homebrew/science/bowtie
$ bowtie-build ref.fa ref

$ brew install homebrew/science/adam
$ git clone https://github.com/heuermh/cannoli
$ cd cannoli
$ ./scripts/move_to_scala_2.11.sh
$ ./scripts/move_to_spark_2.sh
$ mvn install

$ ADAM_MAIN=org.bdgenomics.cannoli.Cannoli \
  adam-submit \
  --jars target/cannoli-spark2_2.11-0.1-SNAPSHOT.jar \
  -- \
  bowtie -single -bowtie_index ref reads.ifq bowtie.sam

Using ADAM_MAIN=org.bdgenomics.cannoli.Cannoli
Using SPARK_SUBMIT=/usr/local/bin/spark-submit
# reads processed: 6
# reads with at least one reported alignment: 0 (0.00%)
# reads that failed to align: 6 (100.00%)
No alignments

$ head bowtie.sam 
@HD VN:1.5  SO:unsorted
H06HDADXX130110:2:2116:3345:91806   4   *   0   0   * ...
fnothaft commented 7 years ago

Nice! Thats awesome, @heuermh! I will take more of a gander over the code later this week.

TomNash commented 7 years ago

Tried using the format above from @heuermh , getting the following output:

Using ADAM_MAIN=org.bdgenomics.cannoli.Cannoli
Using SPARK_SUBMIT=/home/anderson-lab/software/spark/bin/spark-submit
Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.bdgenomics.cannoli.Bowtie.<init>(Bowtie.scala:68)
        at org.bdgenomics.cannoli.Bowtie$.apply(Bowtie.scala:38)
        at org.bdgenomics.cannoli.Bowtie$.apply(Bowtie.scala:33)
        at org.bdgenomics.adam.cli.ADAMMain.apply(ADAMMain.scala:126)
        at org.bdgenomics.cannoli.Cannoli$.main(Cannoli.scala:29)
        at org.bdgenomics.cannoli.Cannoli.main(Cannoli.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Some Googling has lead me to believe this has to do with Maven dependency version issues, but not sure where that problem would arise here.

fnothaft commented 7 years ago

Hi @TomNash! What versions of Java and Spark are you running?

TomNash commented 7 years ago

Java:

java version "1.8.0_121"
Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)

Scala:

Scala code runner version 2.11.4 -- Copyright 2002-2013, LAMP/EPFL
heuermh commented 7 years ago

@TomNash that is what I see when running a build for Spark 1.x and Scala 2.10 on Spark 2.x with Scala 2.11. Did you use the move_to_spark_1.sh and move_to_scala_2.11.sh scripts when building cannoli as described above?

TomNash commented 7 years ago

Ran both scripts. Here is full output from install. I've tried a mvn clean install as well, same result.

anderson-lab@genomics-vm:~/software$ rm -rf cannoli/
anderson-lab@genomics-vm:~/software$ git clone https://github.com/heuermh/cannoli                                                                                                                        
Cloning into 'cannoli'...
remote: Counting objects: 101, done.
remote: Compressing objects: 100% (34/34), done.
remote: Total 101 (delta 30), reused 97 (delta 26), pack-reused 0
Receiving objects: 100% (101/101), 25.64 KiB | 0 bytes/s, done.
Resolving deltas: 100% (30/30), done.
Checking connectivity... done.
anderson-lab@genomics-vm:~/software$ cd cannoli
anderson-lab@genomics-vm:~/software/cannoli$ ./scripts/move_to_scala_2.11.sh 
anderson-lab@genomics-vm:~/software/cannoli$ ./scripts/move_to_spark_2.sh 
anderson-lab@genomics-vm:~/software/cannoli$ mvn install -q                                                                                                                                              
Discovery starting.
Discovery completed in 88 milliseconds.
Run starting. Expected test count is: 0
DiscoverySuite:
Run completed in 166 milliseconds.
Total number of tests run: 0
Suites: completed 1, aborted 0
Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
No tests were executed.
model contains 20 documentable templates
anderson-lab@genomics-vm:~/software/cannoli$ ADAM_MAIN=org.bdgenomics.cannoli.Cannoli \
adam-submit --jars target/cannoli-spark2_2.11-0.1-SNAPSHOT.jar -- bowtie -single \
-bowtie_index /home/anderson-lab/spark-genome-alignment-demo/build/data/NC_008253 \
/home/anderson-lab/spark-genome-alignment-demo/build/data/reads.fq bowtie.sam
Using ADAM_MAIN=org.bdgenomics.cannoli.Cannoli
Using SPARK_SUBMIT=/home/anderson-lab/software/spark/bin/spark-submit
Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.bdgenomics.cannoli.Bowtie.<init>(Bowtie.scala:68)
        at org.bdgenomics.cannoli.Bowtie$.apply(Bowtie.scala:38)
        at org.bdgenomics.cannoli.Bowtie$.apply(Bowtie.scala:33)
        at org.bdgenomics.adam.cli.ADAMMain.apply(ADAMMain.scala:126)
        at org.bdgenomics.cannoli.Cannoli$.main(Cannoli.scala:29)
        at org.bdgenomics.cannoli.Cannoli.main(Cannoli.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
anderson-lab@genomics-vm:~/software/cannoli$
heuermh commented 7 years ago

Yeah ... --jars target/cannoli-spark2_2.11-0.1-SNAPSHOT.jar looks right. What does spark-submit --version read?

TomNash commented 7 years ago
anderson-lab@genomics-vm:~/software/cannoli$ $SPARK_HOME/bin/spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_121
Branch 
Compiled by user jenkins on 2016-12-16T02:04:48Z
Revision 
Url 
Type --help for more information.
heuermh commented 7 years ago

Hmm, I'm sorry, I don't see what problem could be.

I just tried on a new Mac

cannoli master!
$ history
  149  git clone https://github.com/heuermh/cannoli
  150  cd cannoli/
  151  ./scripts/move_to_scala_2.11.sh 
  152  ./scripts/move_to_spark_2.sh 
  153  mvn install
  154  cp ../adam/adam-core/src/test/resources/chr20.250k.fa.gz .
  155  gunzip chr20.250k.fa.gz 
  156  bowtie-build chr20.250k.fa chr20.250k

cannoli master!
$ brew install adam
==> Installing adam from homebrew/science
==> Downloading https://homebrew.bintray.com/bottles-science/adam-0.21.0.el_capi
######################################################################## 100.0%
==> Pouring adam-0.21.0.el_capitan.bottle.tar.gz
🍺  /usr/local/Cellar/adam/0.21.0: 363 files, 51.3M

cannoli master!
$ adam-submit --version
Using ADAM_MAIN=org.bdgenomics.adam.cli.ADAMMain
Using SPARK_SUBMIT=/usr/local/bin/spark-submit

       e         888~-_          e             e    e
      d8b        888   \        d8b           d8b  d8b
     /Y88b       888    |      /Y88b         d888bdY88b
    /  Y88b      888    |     /  Y88b       / Y88Y Y888b
   /____Y88b     888   /     /____Y88b     /   YY   Y888b
  /      Y88b    888_-~     /      Y88b   /          Y888b

ADAM version: 0.21.0
Built for: Scala 2.11.8 and Hadoop 2.7.3

cannoli master!
$ spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_111
Branch 
Compiled by user jenkins on 2016-12-16T02:04:48Z
Revision 
Url 
Type --help for more information.

cannoli master!
$ ADAM_MAIN=org.bdgenomics.cannoli.Cannoli \
> adam-submit \
> --jars target/cannoli-spark2_2.11-0.1-SNAPSHOT.jar \
> -- \
> bowtie -single -bowtie_index chr20.250k ../adam/adam-core/src/test/resources/interleaved_fastq_sample1.ifq bowtie.sam
Using ADAM_MAIN=org.bdgenomics.cannoli.Cannoli
Using SPARK_SUBMIT=/usr/local/bin/spark-submit
2017-01-30 12:45:31 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
# reads processed: 6
# reads with at least one reported alignment: 0 (0.00%)
# reads that failed to align: 6 (100.00%)
No alignments
TomNash commented 7 years ago

So the above example works after installing the binary ADAM release.

Does cannoli work only with interleaved FASTQ?

chowbina commented 7 years ago

Hello,

Thanks for the great tool. I get the following error while running bowtie using ADAM 0.22 and Spark 2.1

$ ADAM_MAIN=org.bdgenomics.cannoli.Cannoli adam-submit --jars target/cannoli-spark2_2.11-0.1-SNAPSHOT.jar -- bowtie -single -bowtie_index chr20.250k interleaved_fastq_sample1.ifq bowtie.sam

Using ADAM_MAIN=org.bdgenomics.cannoli.Cannoli Using SPARK_SUBMIT=/Users/sudhir-chowbina/Documents/Softwares/spark//bin/spark-submit 2017-04-05 11:59:39 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Error: reads file does not look like a FASTQ file Command: bowtie-align --wrapper basic-0 -S chr20.250k - 2017-04-05 11:59:48 WARN BlockManager:66 - Putting block rdd_2_0 failed due to an exception 2017-04-05 11:59:48 WARN BlockManager:66 - Block rdd_2_0 could not be removed as it was not found on disk or in memory 2017-04-05 11:59:48 ERROR Executor:91 - Exception in task 0.0 in stage 0.0 (TID 0) java.lang.RuntimeException: Piped command List(bowtie, -S, chr20.250k, -) exited with error code 1. at org.bdgenomics.adam.rdd.OutFormatterRunner.hasNext(OutFormatter.scala:37) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2017-04-05 11:59:48 WARN TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.RuntimeException: Piped command List(bowtie, -S, chr20.250k, -) exited with error code 1. at org.bdgenomics.adam.rdd.OutFormatterR

fnothaft commented 7 years ago

Ah, interesting! Thanks for posting the error @chowbina. @heuermh have you seen that before?

heuermh commented 7 years ago

I've created a new issue in the cannoli repository https://github.com/heuermh/cannoli/issues/18 to track.

fnothaft commented 7 years ago

Closing as this has moved downstream to https://github.com/bigdatagenomics/cannoli/issues/18.