broadinstitute / gatk

Official code repository for GATK versions 4 and up
https://software.broadinstitute.org/gatk
Other
1.68k stars 587 forks source link

Spark and mapr #3936

Open AxVE opened 6 years ago

AxVE commented 6 years ago

Hi, Our spark installation use a mapr filesystem ( hdfs compatible ). GATK spark tools does not seems to recognize it. When running the following command:

/home/axverdier/Tools/GATK4/gatk-4.beta.6/gatk-launch CountReadsSpark --programName gatk4-testing --input maprfs://spark-ics/user/axverdier/data/710-PE-G1.bam --output maprfs://spark-ics/user/axverdier/testOutGATK_CountReadsSpark --sparkRunner SPARK --sparkMaster yarn --javaOptions -Dmapr.library.flatclass I got the following error!

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1436) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1424) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1651) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1606) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1595) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958) at org.apache.spark.rdd.RDD.count(RDD.scala:1158) at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:455) at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:45) at org.broadinstitute.hellbender.tools.spark.pipelines.CountReadsSpark.runTool(CountReadsSpark.java:38) at org.broadinstitute.hellbender.engine.spark.GATKSparkTool.runPipeline(GATKSparkTool.java:362) at org.broadinstitute.hellbender.engine.spark.SparkCommandLineProgram.doWork(SparkCommandLineProgram.java:38) at org.broadinstitute.hellbender.cmdline.CommandLineProgram.runTool(CommandLineProgram.java:119) at org.broadinstitute.hellbender.cmdline.CommandLineProgram.instanceMainPostParseArgs(CommandLineProgram.java:176) at org.broadinstitute.hellbender.cmdline.CommandLineProgram.instanceMain(CommandLineProgram.java:195) at org.broadinstitute.hellbender.Main.runCommandLineProgram(Main.java:137) at org.broadinstitute.hellbender.Main.mainEntry(Main.java:158) at org.broadinstitute.hellbender.Main.main(Main.java:239) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:733) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:177) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:202) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:116) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.nio.file.ProviderNotFoundException: Provider "maprfs" not found at java.nio.file.FileSystems.newFileSystem(FileSystems.java:341) at org.seqdoop.hadoop_bam.util.NIOFileUtil.asPath(NIOFileUtil.java:40) at org.seqdoop.hadoop_bam.BAMRecordReader.initialize(BAMRecordReader.java:143) at org.seqdoop.hadoop_bam.BAMInputFormat.createRecordReader(BAMInputFormat.java:226) at org.seqdoop.hadoop_bam.AnySAMInputFormat.createRecordReader(AnySAMInputFormat.java:190) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.liftedTree1$1(NewHadoopRDD.scala:178) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:177) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:134) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88) at org.apache.spark.scheduler.Task.run(Task.scala:100) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:317) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

As standalone solution, I have to set paths thourgh hdfs which is tricky:

/home/axverdier/Tools/GATK4/gatk-4.beta.6/gatk-launch CountReadsSpark --programName gatk4-testing --input hdfs://spark01:7222/user/axverdier/data/710-PE-G1.bam --output hdfs://spark01:7222/user/axverdier/testOutGATK_CountReadsSpark --sparkRunner SPARK --sparkMaster yarn --javaOptions -Dmapr.library.flatclass

Could it be possible for gatk spark tools to manage maprfs ?

lbergelson commented 6 years ago

This seems like a consequence of the fact that we use java.nio.file.Pathfor a lot of things in gatk. This requires a custom java.nio.file.spi.FileSystemProvider to be available for each type of path you want to be able to resolve. Spark native uses org.apache.hadoop.fs.Path for a lot of things. It's seems likely that that maprfs provides a hadoop file system plugin, which many spark applications can consume, but it's unlikely that it also provides a java.nio.file.Path implementation.

I don't think we'd be able to implement a provider for maprfs ourselves. We don't have any systems with maprfs and don't have the bandwidth to take it on right now. Implementing a file system provider isn't a terribly complicated project, but it's not a trivial one either. However, there's an implementation for hadoop here https://github.com/damiencarol/jsr203-hadoop which is sufficient for what gatk does. If maprfs provides a hadoop file system, it would probably not be too difficult to take that project as a template and modify it to use the maprfs implementation.

I think the only things you'd have to implement for the spark tools to work are the basic Path operations that support the simple operations like Paths.get(),Files.exists(), and Path.resolve(). (although that's not a complete list.

If you are interested in writing a plugin like that, you can add it to the gatk class path at runtime. We might also be open to packaging such a plugin with the gatk if there was wide demand for it.

lbergelson commented 6 years ago

I also saw something in this thread:

The URI for maprfs should start with "maprfs:/" or "maprfs:///". If it starts with "maprfs://", it will fail. Just in case.

I don't think that's the problem you're having, but it might be an additional problem.

AxVE commented 6 years ago

I have developed a tool to write on maprfs using Hadoop libraries for our owns projects. But it's not really tested yet and the reader is not done. I prefer to wait it to be complete and to be sure it works perfectly. Btw, it's in scala. Here a file to give you an idea, keep in mind it's not clean and complete.

I tried with maprfs:/ and maprfs://but still was getting an error but not the same:

/home/axverdier/Tools/GATK4/gatk-4.beta.6/gatk-launch CountReadsSpark --programName gatk4-testing --input maprfs:///spark-ics/user/axverdier/data/710-PE-G1.bam --output hdfs://spark01:7222/user/axverdier/testOutGATK_CountReadsSpark --javaOptions -Dmapr.library.flatclass --sparkRunner SPARK --sparkMaster yarn


A USER ERROR has occurred: Failed to read bam header from maprfs:///spark-ics/user/axverdier/data/710-PE-G1.bam Caused by:/spark-ics/user/axverdier/data/710-PE-G1.bam


org.broadinstitute.hellbender.exceptions.UserException: Failed to read bam header from maprfs:///spark-ics/user/axverdier/data/710-PE-G1.bam Caused by:/spark-ics/user/axverdier/data/710-PE-G1.bam at org.broadinstitute.hellbender.engine.spark.datasources.ReadsSparkSource.getHeader(ReadsSparkSource.java:207) at org.broadinstitute.hellbender.engine.spark.GATKSparkTool.initializeReads(GATKSparkTool.java:390) at org.broadinstitute.hellbender.engine.spark.GATKSparkTool.initializeToolInputs(GATKSparkTool.java:370) at org.broadinstitute.hellbender.engine.spark.GATKSparkTool.runPipeline(GATKSparkTool.java:360) at org.broadinstitute.hellbender.engine.spark.SparkCommandLineProgram.doWork(SparkCommandLineProgram.java:38) at org.broadinstitute.hellbender.cmdline.CommandLineProgram.runTool(CommandLineProgram.java:119) at org.broadinstitute.hellbender.cmdline.CommandLineProgram.instanceMainPostParseArgs(CommandLineProgram.java:176) at org.broadinstitute.hellbender.cmdline.CommandLineProgram.instanceMain(CommandLineProgram.java:195) at org.broadinstitute.hellbender.Main.runCommandLineProgram(Main.java:137) at org.broadinstitute.hellbender.Main.mainEntry(Main.java:158) at org.broadinstitute.hellbender.Main.main(Main.java:239) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:733) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:177) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:202) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:116) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.FileNotFoundException: /spark-ics/user/axverdier/data/710-PE-G1.bam at com.mapr.fs.MapRClientImpl.open(MapRClientImpl.java:243) at com.mapr.fs.MapRFileSystem.open(MapRFileSystem.java:958) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:807) at org.seqdoop.hadoop_bam.util.SAMHeaderReader.readSAMHeaderFrom(SAMHeaderReader.java:51) at org.broadinstitute.hellbender.engine.spark.datasources.ReadsSparkSource.getHeader(ReadsSparkSource.java:205)

cuericlee commented 6 years ago

@lbergelson Louis, I run into the similar trouble when I enable GATK to access oss fs of Aliyun, our oss of hadoop-fs impl (org.apache.hadoop.fs.{FileSystem, Path}) work well with spark perfectly but oss provider (java.nio.file.spi.FileSystemProvider) is not available today. (Not included in gatk package gatk-4.beta.6.

After I researched on SparkContext impl and GATK rg.seqdoop.hadoop_bam.AnySAMInputFormat impl. IOUtils

Today, org.apache.hadoop.fs.{FileSystem, Path} is much broadly used in the Big Data world, and most of vendors of distribution storage provider already provide impl of org.apache.hadoop.fs.{FileSystem, Path} include AWS, Google and Alibaba. There are huge customers of Hadoop already work on hadoop.fs for years, if GAKT on spark could rely on org.apache.hadoop.fs.{FileSystem, Path} , I guess GAKT could acquire more existing customers of Hadoop on Cloud much faster .

Maybe we could consider migrating java.nio.file.FileSystem impl to org.apache.hadoop.fs.{FileSystem, Path} impl in [SparkContextFacto]r(https://github.com/broadinstitute/gatk/blob/73f2a62bee52518b57a985717770ed3a64d83243/src/main/java/org/broadinstitute/hellbender/engine/spark/SparkContextFactory.java), otherwise we could support both nio and hadoop thru env variable, Let me know your thought!

scala> stringRdd.saveAsTextFile("oss://eric-new/testwrite10")

scala> val stringRdd = sc.parallelize(Seq("Test String"))
stringRdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> stringRdd.saveAsTextFile("oss://eric-new/testwrite11")
18/01/03 17:13:13 INFO NewHadoopRDD: Input split: oss://eric-new/resources/NA12878.chr17_69k_70k.dictFix.bam:1632-3770875903
18/01/03 17:13:13 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.nio.file.ProviderNotFoundException: Provider "oss" not found
    at java.nio.file.FileSystems.newFileSystem(FileSystems.java:341)
    at org.seqdoop.hadoop_bam.util.NIOFileUtil.asPath(NIOFileUtil.java:40)
    at org.seqdoop.hadoop_bam.BAMRecordReader.initialize(BAMRecordReader.java:143)
    at org.seqdoop.hadoop_bam.BAMInputFormat.createRecordReader(BAMInputFormat.java:226)
    at org.seqdoop.hadoop_bam.AnySAMInputFormat.createRecordReader(AnySAMInputFormat.java:190)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.liftedTree1$1(NewHadoopRDD.scala:180)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:179)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:134)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
jaideepjoshi commented 6 years ago

Hey AxVE: I am running into the same issues. Were you able to get these tools to work with maprfs ? Thanks

AxVE commented 6 years ago

Hi @jaideepjoshi , I was able to get it work with maprfs but using its hdfs path (maprfs is compatible with it). The weird things is: for a path maprfs:///username/file, it will be hdfs://spark01:7222/user/username/file. For example: ./gatkRun.sh CountVariantsSpark -V hdfs://spark01:7222/user/axverdier/data/phalstedii/PLHAL710.710-20180213-1113.vcf

gatkRun.sh is just a script to run gatk on spark with some parameters. As I didn't run a job recently (yes, I'm lazy ^^), it's possible it's not compatible with the current gatk version (for example, --spark-submit-command was not).

#! /bin/bash

# Run a gatk spark tool on the lipm's spark.
# The command is: ./gatkRun.sh <gatk spark tool name> <tool arguments>

# === gatk-launcheri and spark infos ===
launcher="/usr/local/bioinfo/bin/gatk"
sparkNumExecutors=250
sparkMemExecutors="5g"

# At least, the tool must be defined
if [[ $# -eq 0 ]]; then
        echo "ERROR: you must specify the gatk tool"
        exit 1;
fi

# === Get args ===
# Get the tool, then remove it from arguments (with shift) so $@ only contains its parameters
tool=$1 # Get the tool name
name="GATK_$tool"
shift
toolParams=$@

# === Running ===
sparkParams="--name $name --num-executors $sparkNumExecutors --executor-memory $sparkMemExecutors --deploy-mode cluster"

gatkSparkParams="--program-name $name --spark-runner SPARK --spark-master yarn" # gatkspark parameters related to spark.

cmd="$launcher $tool $toolParams $gatkSparkParams -- $sparkParams"
echo -e "\n$cmd\n"
$cmd
lbergelson commented 6 years ago

As an update here, we're currently planning an upgrade to the library that we use to read bams into spark. As part of that upgrade we're going to try to fix the issue that requires 2 separate filesystem plugins for some things to work. That should enable people with hdfs file system plugins to work with gatk without a matching NIO plugin. There's no definite timeline, but hopefully within the next quarter.

jaideepjoshi commented 6 years ago

Thanks @AxVE. Works.