bigdatagenomics / adam

ADAM is a genomics analysis platform with specialized file formats built using Apache Avro, Apache Spark, and Apache Parquet. Apache 2 licensed.
Apache License 2.0
996 stars 309 forks source link

converting fasta to adam eats a huge ammount of time and memory #1891

Closed antonkulaga closed 5 years ago

antonkulaga commented 6 years ago

I wonder why I need a lot of RAM and >1 hour time with 8 cores just to convert some 800MB fasta file to ADAM format? Crazy memory and CPU consumption of ADAM's FastaConverter makes ADAM unusable in many use-cases. What is also annoying is that when it lakes memory it often continues for ages instead of just crashing

P.S. Looks like it also eats a lot of spark-driver RAM. However, I am not sure how I should change RAM limitations based on fasta file size. For instance, what RAM parameters are optimal for 1GB fasta file?

fnothaft commented 6 years ago

Hi @antonkulaga! Is this a standard reference genome that you could share? By any chance, is it gzipped? Our FASTA converter is not particularly zippy, but the runtimes you are describing seem extremely slow.

antonkulaga commented 6 years ago

@fnothaft yes, super-slow and often freezing conversions are a big pain for me and the reason why I use ADAM not as often as I would want to. I have no idea what is wrong there :(

Here is the latest example of frozen conversion that I had with another file: I converted this 500mb not gziped fasta file http://agingkills.westeurope.cloudapp.azure.com/pipelines/indexes/BOWHEAD_WHALE/Alaska/Trinity.fasta to adam format. I uploaded it to HDFS and ran the following command:

 ./convert.sh alaska/Trinity.fasta alaska/transcripts.adam 8G 4

where convert.sh is just:

#!/bin/bash
docker run --network=bigdata_default -p 4040:4040  quay.io/comp-bio-aging/adam:latest /opt/adam/bin/adam-submit \
--master spark://spark-master:7077 --executor-memory $3 --total-executor-cores $4 --driver-memory 5G -- \
fasta2adam hdfs://namenode/pipelines/BOWHEAD_WHALE/$1 hdfs://namenode/pipelines/BOWHEAD_WHALE/$2

My bigdata containers are at ( https://github.com/antonkulaga/bigdata-docker/tree/master/containers ), including adam one https://github.com/antonkulaga/bigdata-docker/blob/master/containers/adam/Dockerfile . The spark configuration was https://github.com/antonkulaga/bigdata-docker/blob/master/conf/spark/master.conf I ran it at two nodes cluster with 30GB RAM per node.

I converted the fasta file that was already uploaded to HDFS. As the file was a bit more than 500MB I gave only 8GB RAM for ADAM to do the job. The job was running for 2.4 hours and I had to stop it. The log is here: mistery_upload

antonkulaga@web:/pipelines/indexes/BOWHEAD_WHALE$ ./convert.sh alaska/Trinity.fasta alaska/transcripts.adam 8G 4
Using ADAM_MAIN=org.bdgenomics.adam.cli.ADAMMain
Using spark-submit=/spark//bin/spark-submit
log4j:WARN No appenders could be found for logger (org.bdgenomics.adam.cli.ADAMMain).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/01/26 14:34:20 INFO SparkContext: Running Spark version 2.2.1
18/01/26 14:34:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/01/26 14:34:20 INFO SparkContext: Submitted application: fasta2adam
18/01/26 14:34:21 INFO SecurityManager: Changing view acls to: root
18/01/26 14:34:21 INFO SecurityManager: Changing modify acls to: root
18/01/26 14:34:21 INFO SecurityManager: Changing view acls groups to: 
18/01/26 14:34:21 INFO SecurityManager: Changing modify acls groups to: 
18/01/26 14:34:21 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
18/01/26 14:34:21 INFO Utils: Successfully started service 'sparkDriver' on port 33663.
18/01/26 14:34:21 INFO SparkEnv: Registering MapOutputTracker
18/01/26 14:34:22 INFO SparkEnv: Registering BlockManagerMaster
18/01/26 14:34:22 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
18/01/26 14:34:22 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
18/01/26 14:34:22 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-3d33de94-67e6-4e87-899f-34be10ea2876
18/01/26 14:34:22 INFO MemoryStore: MemoryStore started with capacity 2.5 GB
18/01/26 14:34:22 INFO SparkEnv: Registering OutputCommitCoordinator
18/01/26 14:34:23 INFO Utils: Successfully started service 'SparkUI' on port 4040.
18/01/26 14:34:23 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.0.2.15:4040
18/01/26 14:34:23 INFO SparkContext: Added JAR file:/opt/adam/adam-assembly/target/adam-assembly-spark2_2.11-0.24.0-SNAPSHOT.jar at spark://10.0.2.15:33663/jars/adam-assembly-spark2_2.11-0.24.0-SNAPSHOT.jar with timestamp 1516977263580
18/01/26 14:34:24 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://spark-master:7077...
18/01/26 14:34:24 INFO TransportClientFactory: Successfully created connection to spark-master/10.0.2.12:7077 after 27 ms (0 ms spent in bootstraps)
18/01/26 14:34:24 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20180126143424-0000
18/01/26 14:34:24 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20180126143424-0000/0 on worker-20180126143033-10.0.2.3-34251 (10.0.2.3:34251) with 2 cores
18/01/26 14:34:24 INFO StandaloneSchedulerBackend: Granted executor ID app-20180126143424-0000/0 on hostPort 10.0.2.3:34251 with 2 cores, 8.0 GB RAM
18/01/26 14:34:24 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20180126143424-0000/1 on worker-20180126142941-10.0.2.14-43481 (10.0.2.14:43481) with 2 cores
18/01/26 14:34:24 INFO StandaloneSchedulerBackend: Granted executor ID app-20180126143424-0000/1 on hostPort 10.0.2.14:43481 with 2 cores, 8.0 GB RAM
18/01/26 14:34:24 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 33911.
18/01/26 14:34:24 INFO NettyBlockTransferService: Server created on 10.0.2.15:33911
18/01/26 14:34:24 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
18/01/26 14:34:24 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.0.2.15, 33911, None)
18/01/26 14:34:24 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20180126143424-0000/1 is now RUNNING
18/01/26 14:34:24 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20180126143424-0000/0 is now RUNNING
18/01/26 14:34:25 INFO BlockManagerMasterEndpoint: Registering block manager 10.0.2.15:33911 with 2.5 GB RAM, BlockManagerId(driver, 10.0.2.15, 33911, None)
18/01/26 14:34:25 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.0.2.15, 33911, None)
18/01/26 14:34:25 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.0.2.15, 33911, None)
18/01/26 14:34:25 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
18/01/26 14:34:25 INFO Fasta2ADAM: Loading FASTA data from disk.
18/01/26 14:34:27 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.0.2.3:55762) with ID 0
18/01/26 14:34:27 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.0.2.14:47880) with ID 1
18/01/26 14:34:27 INFO BlockManagerMasterEndpoint: Registering block manager 10.0.2.14:44433 with 4.1 GB RAM, BlockManagerId(1, 10.0.2.14, 44433, None)
18/01/26 14:34:27 INFO BlockManagerMasterEndpoint: Registering block manager 10.0.2.3:33419 with 4.1 GB RAM, BlockManagerId(0, 10.0.2.3, 33419, None)
18/01/26 14:34:36 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 275.8 KB, free 2.5 GB)
18/01/26 14:34:41 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.0 KB, free 2.5 GB)
18/01/26 14:34:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.0.2.15:33911 (size: 23.0 KB, free: 2.5 GB)
18/01/26 14:34:41 INFO SparkContext: Created broadcast 0 from newAPIHadoopFile at ADAMContext.scala:2293
18/01/26 14:34:43 INFO FileInputFormat: Total input paths to process : 1
18/01/26 14:34:44 INFO SparkContext: Starting job: collect at FastaConverter.scala:182
18/01/26 14:34:44 INFO DAGScheduler: Got job 0 (collect at FastaConverter.scala:182) with 4 output partitions
18/01/26 14:34:44 INFO DAGScheduler: Final stage: ResultStage 0 (collect at FastaConverter.scala:182)
18/01/26 14:34:44 INFO DAGScheduler: Parents of final stage: List()
18/01/26 14:34:44 INFO DAGScheduler: Missing parents: List()
18/01/26 14:34:44 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[4] at filter at FastaConverter.scala:181), which has no missing parents
18/01/26 14:34:44 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.2 KB, free 2.5 GB)
18/01/26 14:34:44 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1834.0 B, free 2.5 GB)
18/01/26 14:34:44 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.0.2.15:33911 (size: 1834.0 B, free: 2.5 GB)
18/01/26 14:34:44 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
18/01/26 14:34:44 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 0 (MapPartitionsRDD[4] at filter at FastaConverter.scala:181) (first 15 tasks are for partitions Vector(0, 1, 2, 3))
18/01/26 14:34:44 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
18/01/26 14:34:44 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.0.2.3, executor 0, partition 0, ANY, 4925 bytes)
18/01/26 14:34:44 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 10.0.2.14, executor 1, partition 1, ANY, 4925 bytes)
18/01/26 14:34:44 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 10.0.2.3, executor 0, partition 2, ANY, 4925 bytes)
18/01/26 14:34:44 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 10.0.2.14, executor 1, partition 3, ANY, 4925 bytes)
18/01/26 14:34:53 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.0.2.14:44433 (size: 1834.0 B, free: 4.1 GB)
18/01/26 14:34:53 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.0.2.3:33419 (size: 1834.0 B, free: 4.1 GB)
18/01/26 14:34:54 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.0.2.3:33419 (size: 23.0 KB, free: 4.1 GB)
18/01/26 14:34:54 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.0.2.14:44433 (size: 23.0 KB, free: 4.1 GB)
18/01/26 14:35:04 INFO BlockManagerInfo: Added taskresult_3 in memory on 10.0.2.14:44433 (size: 9.9 MB, free: 4.1 GB)
18/01/26 14:35:04 INFO TransportClientFactory: Successfully created connection to /10.0.2.14:44433 after 3 ms (0 ms spent in bootstraps)
18/01/26 14:35:04 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 19869 ms on 10.0.2.14 (executor 1) (1/4)
18/01/26 14:35:04 INFO BlockManagerInfo: Removed taskresult_3 on 10.0.2.14:44433 in memory (size: 9.9 MB, free: 4.1 GB)
18/01/26 14:35:17 INFO BlockManagerInfo: Added taskresult_2 in memory on 10.0.2.3:33419 (size: 17.7 MB, free: 4.1 GB)
18/01/26 14:35:17 INFO TransportClientFactory: Successfully created connection to /10.0.2.3:33419 after 1 ms (0 ms spent in bootstraps)
18/01/26 14:35:17 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 32440 ms on 10.0.2.3 (executor 0) (2/4)
18/01/26 14:35:17 INFO BlockManagerInfo: Removed taskresult_2 on 10.0.2.3:33419 in memory (size: 17.7 MB, free: 4.1 GB)
18/01/26 14:35:17 INFO BlockManagerInfo: Added taskresult_0 in memory on 10.0.2.3:33419 (size: 5.8 MB, free: 4.1 GB)
18/01/26 14:35:17 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 32621 ms on 10.0.2.3 (executor 0) (3/4)
18/01/26 14:35:17 INFO BlockManagerInfo: Removed taskresult_0 on 10.0.2.3:33419 in memory (size: 5.8 MB, free: 4.1 GB)
18/01/26 14:35:18 INFO BlockManagerInfo: Added taskresult_1 in memory on 10.0.2.14:44433 (size: 8.2 MB, free: 4.1 GB)
18/01/26 14:35:18 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 33869 ms on 10.0.2.14 (executor 1) (4/4)
18/01/26 14:35:18 INFO DAGScheduler: ResultStage 0 (collect at FastaConverter.scala:182) finished in 33.929 s
18/01/26 14:35:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
18/01/26 14:35:18 INFO DAGScheduler: Job 0 finished: collect at FastaConverter.scala:182, took 34.444610 s
18/01/26 14:35:18 INFO BlockManagerInfo: Removed taskresult_1 on 10.0.2.14:44433 in memory (size: 8.2 MB, free: 4.1 GB)
18/01/26 14:35:19 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.0.2.3:33419 in memory (size: 1834.0 B, free: 4.1 GB)
18/01/26 14:35:19 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.0.2.14:44433 in memory (size: 1834.0 B, free: 4.1 GB)
18/01/26 14:35:19 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.0.2.15:33911 in memory (size: 1834.0 B, free: 2.5 GB)
18/01/26 14:35:24 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 277.8 MB, free 2.2 GB)
18/01/26 14:35:27 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.0 MB, free 2.2 GB)
18/01/26 14:35:27 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.0.2.15:33911 (size: 4.0 MB, free: 2.5 GB)
18/01/26 14:35:27 INFO MemoryStore: Block broadcast_2_piece1 stored as bytes in memory (estimated size 4.0 MB, free 2.2 GB)
18/01/26 14:35:27 INFO BlockManagerInfo: Added broadcast_2_piece1 in memory on 10.0.2.15:33911 (size: 4.0 MB, free: 2.5 GB)
18/01/26 14:35:27 INFO MemoryStore: Block broadcast_2_piece2 stored as bytes in memory (estimated size 4.0 MB, free 2.2 GB)
18/01/26 14:35:27 INFO BlockManagerInfo: Added broadcast_2_piece2 in memory on 10.0.2.15:33911 (size: 4.0 MB, free: 2.5 GB)
18/01/26 14:35:27 INFO MemoryStore: Block broadcast_2_piece3 stored as bytes in memory (estimated size 4.0 MB, free 2.2 GB)
18/01/26 14:35:27 INFO BlockManagerInfo: Added broadcast_2_piece3 in memory on 10.0.2.15:33911 (size: 4.0 MB, free: 2.5 GB)
18/01/26 14:35:27 INFO MemoryStore: Block broadcast_2_piece4 stored as bytes in memory (estimated size 4.0 MB, free 2.2 GB)
18/01/26 14:35:27 INFO BlockManagerInfo: Added broadcast_2_piece4 in memory on 10.0.2.15:33911 (size: 4.0 MB, free: 2.5 GB)
18/01/26 14:35:27 INFO MemoryStore: Block broadcast_2_piece5 stored as bytes in memory (estimated size 4.0 MB, free 2.2 GB)
18/01/26 14:35:27 INFO BlockManagerInfo: Added broadcast_2_piece5 in memory on 10.0.2.15:33911 (size: 4.0 MB, free: 2.5 GB)
18/01/26 14:35:27 INFO MemoryStore: Block broadcast_2_piece6 stored as bytes in memory (estimated size 4.0 MB, free 2.2 GB)
18/01/26 14:35:27 INFO BlockManagerInfo: Added broadcast_2_piece6 in memory on 10.0.2.15:33911 (size: 4.0 MB, free: 2.5 GB)
18/01/26 14:35:27 INFO MemoryStore: Block broadcast_2_piece7 stored as bytes in memory (estimated size 4.0 MB, free 2.2 GB)
18/01/26 14:35:27 INFO BlockManagerInfo: Added broadcast_2_piece7 in memory on 10.0.2.15:33911 (size: 4.0 MB, free: 2.5 GB)
18/01/26 14:35:27 INFO MemoryStore: Block broadcast_2_piece8 stored as bytes in memory (estimated size 818.8 KB, free 2.2 GB)
18/01/26 14:35:27 INFO BlockManagerInfo: Added broadcast_2_piece8 in memory on 10.0.2.15:33911 (size: 818.8 KB, free: 2.5 GB)
18/01/26 14:35:27 INFO SparkContext: Created broadcast 2 from broadcast at FastaConverter.scala:116
18/01/26 14:35:27 INFO SparkContext: Starting job: collect at NucleotideContigFragmentRDD.scala:91
18/01/26 14:35:27 INFO DAGScheduler: Registering RDD 6 (keyBy at FastaConverter.scala:124)
18/01/26 14:35:27 INFO DAGScheduler: Registering RDD 10 (distinct at NucleotideContigFragmentRDD.scala:90)
18/01/26 14:35:27 INFO DAGScheduler: Got job 1 (collect at NucleotideContigFragmentRDD.scala:91) with 4 output partitions
18/01/26 14:35:27 INFO DAGScheduler: Final stage: ResultStage 3 (collect at NucleotideContigFragmentRDD.scala:91)
18/01/26 14:35:27 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 2)
18/01/26 14:35:27 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 2)
18/01/26 14:35:27 INFO DAGScheduler: Submitting ShuffleMapStage 1 (MapPartitionsRDD[6] at keyBy at FastaConverter.scala:124), which has no missing parents
18/01/26 14:35:27 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 5.7 KB, free 2.2 GB)
18/01/26 14:35:27 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 3.0 KB, free 2.2 GB)
18/01/26 14:35:27 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 10.0.2.15:33911 (size: 3.0 KB, free: 2.5 GB)
18/01/26 14:35:27 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
18/01/26 14:35:27 INFO DAGScheduler: Submitting 4 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD[6] at keyBy at FastaConverter.scala:124) (first 15 tasks are for partitions Vector(0, 1, 2, 3))
18/01/26 14:35:27 INFO TaskSchedulerImpl: Adding task set 1.0 with 4 tasks
18/01/26 14:35:27 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 4, 10.0.2.14, executor 1, partition 0, ANY, 4914 bytes)
18/01/26 14:35:27 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 5, 10.0.2.3, executor 0, partition 1, ANY, 4914 bytes)
18/01/26 14:35:27 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 6, 10.0.2.14, executor 1, partition 2, ANY, 4914 bytes)
18/01/26 14:35:27 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 7, 10.0.2.3, executor 0, partition 3, ANY, 4914 bytes)
18/01/26 14:35:27 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 10.0.2.3:33419 (size: 3.0 KB, free: 4.1 GB)
18/01/26 14:35:27 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 10.0.2.14:44433 (size: 3.0 KB, free: 4.1 GB)
18/01/26 14:35:27 INFO BlockManagerInfo: Added broadcast_2_piece7 in memory on 10.0.2.3:33419 (size: 4.0 MB, free: 4.1 GB)
18/01/26 14:35:27 INFO BlockManagerInfo: Added broadcast_2_piece8 in memory on 10.0.2.3:33419 (size: 818.8 KB, free: 4.1 GB)
18/01/26 14:35:27 INFO BlockManagerInfo: Added broadcast_2_piece4 in memory on 10.0.2.3:33419 (size: 4.0 MB, free: 4.1 GB)
18/01/26 14:35:28 INFO BlockManagerInfo: Added broadcast_2_piece2 in memory on 10.0.2.3:33419 (size: 4.0 MB, free: 4.1 GB)
18/01/26 14:35:28 INFO BlockManagerInfo: Added broadcast_2_piece3 in memory on 10.0.2.3:33419 (size: 4.0 MB, free: 4.1 GB)
18/01/26 14:35:28 INFO BlockManagerInfo: Added broadcast_2_piece2 in memory on 10.0.2.14:44433 (size: 4.0 MB, free: 4.1 GB)
18/01/26 14:35:28 INFO BlockManagerInfo: Added broadcast_2_piece5 in memory on 10.0.2.3:33419 (size: 4.0 MB, free: 4.1 GB)
18/01/26 14:35:28 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.0.2.3:33419 (size: 4.0 MB, free: 4.1 GB)
18/01/26 14:35:28 INFO BlockManagerInfo: Added broadcast_2_piece6 in memory on 10.0.2.3:33419 (size: 4.0 MB, free: 4.1 GB)
18/01/26 14:35:28 INFO BlockManagerInfo: Added broadcast_2_piece1 in memory on 10.0.2.3:33419 (size: 4.0 MB, free: 4.1 GB)
18/01/26 14:35:28 INFO BlockManagerInfo: Added broadcast_2_piece7 in memory on 10.0.2.14:44433 (size: 4.0 MB, free: 4.1 GB)
18/01/26 14:35:28 INFO BlockManagerInfo: Added broadcast_2_piece6 in memory on 10.0.2.14:44433 (size: 4.0 MB, free: 4.1 GB)
18/01/26 14:35:28 INFO BlockManagerInfo: Added broadcast_2_piece1 in memory on 10.0.2.14:44433 (size: 4.0 MB, free: 4.1 GB)
18/01/26 14:35:28 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.0.2.14:44433 (size: 4.0 MB, free: 4.1 GB)
18/01/26 14:35:28 INFO BlockManagerInfo: Added broadcast_2_piece5 in memory on 10.0.2.14:44433 (size: 4.0 MB, free: 4.1 GB)
18/01/26 14:35:28 INFO BlockManagerInfo: Added broadcast_2_piece3 in memory on 10.0.2.14:44433 (size: 4.0 MB, free: 4.1 GB)
18/01/26 14:35:28 INFO BlockManagerInfo: Added broadcast_2_piece4 in memory on 10.0.2.14:44433 (size: 4.0 MB, free: 4.1 GB)
18/01/26 14:35:28 INFO BlockManagerInfo: Added broadcast_2_piece8 in memory on 10.0.2.14:44433 (size: 818.8 KB, free: 4.1 GB)
antonkulaga commented 6 years ago

@fnothaft , I also tried to run smaller files (like this one http://agingkills.westeurope.cloudapp.azure.com/pipelines/indexes/BOWHEAD_WHALE/Alaska/Trinity.fasta.transdecoder.pep ) and with a local spark (I also tried to load from localfilesystem to exclue any HDFS issues). There it took 20 minutes there and most of the time was spend in the same place

heuermh commented 6 years ago

Thanks for the detail, @antonkulaga! It is ok to pull down those files to test? I've been developing benchmarks for some other things, and will look into this as well.

antonkulaga commented 6 years ago

@heuermh yes, it is ok to take them. The first one is a de novo transcriptome assembly and is public ( from http://www.bowhead-whale.org/downloads/ , Alaska transcriptome) , the second one is transdecoder protein predictions for this assembly. Currently, my main issue with ADAM is this fasta conversion as it is super-slow, unpredictable in terms of optimal RAM and sometimes freezes forever, that is why I use ADAM mostly for custom spark scripts/notebooks but not as a part of our standard bioinformatic pipelines :-(

heuermh commented 6 years ago

Running on our cluster results in OutOfMemoryError

$ adam-submit ... \
  -- fasta2adam hdfs:///Trinity.fasta hdfs:///Trinity.contigs.adam
...
18/01/26 11:03:03 INFO cli.Fasta2ADAM: Loading FASTA data from disk.
...
18/01/26 11:03:04 INFO spark.SparkContext: Starting job: collect at FastaConverter.scala:182
...
18/01/26 11:03:04 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (collect at FastaConverter.scala:182)
...
18/01/26 11:03:19 INFO scheduler.DAGScheduler: Job 0 finished: collect at FastaConverter.scala:182, took 15.370808 s
...
18/01/26 11:03:25 INFO memory.UnifiedMemoryManager: Will not store broadcast_2
as the required space (435863804 bytes) exceeds our memory limit (384093388 bytes)
18/01/26 11:03:25 WARN memory.MemoryStore: Not enough space to cache
broadcast_2 in memory! (computed 277.8 MB so far)
18/01/26 11:03:25 INFO memory.MemoryStore: Memory use = 298.5 KB (blocks) +
1024.0 KB (scratch space shared across 1 tasks(s)) = 1322.5 KB. Storage limit = 366.3 MB.
18/01/26 11:03:25 WARN storage.BlockManager: Persisting block broadcast_2 to disk instead.
...
18/01/26 11:03:33 INFO spark.SparkContext: Created broadcast 2 from broadcast at FastaConverter.scala:116
Command body threw exception:
java.lang.OutOfMemoryError: Java heap space
18/01/26 11:03:39 INFO cli.Fasta2ADAM: Overall Duration: 44.85 secs
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3181)
    at java.util.ArrayList.grow(ArrayList.java:261)
    at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:235)
    at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:227)
    at java.util.ArrayList.add(ArrayList.java:458)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:51)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:827)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:788)
    at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:25)
    at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:19)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
    at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
    at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
    at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43)
    at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
    at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:246)
    at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
    at org.apache.spark.storage.BlockManager.maybeCacheDiskValuesInMemory(BlockManager.scala:1185)
    at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:526)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)

That's an awfully large broadcast_2 value!

antonkulaga commented 6 years ago

@heuermh how many GB-s of RAM did you give to the job, how many cores? I had the same error before I gave more memory both to the executor and to the driver.

heuermh commented 6 years ago

I'm on Yarn with dynamicAllocation enabled, so all of it, and all of them ;)

I ran with

$ adam-submit --master yarn --executor-memory 64G \
  -- fasta2adam hdfs:///Trinity.fasta hdfs:///Trinity.contigs.adam
antonkulaga commented 6 years ago

@heuermh , in such case I do not understand. Maybe there is something wrong with the way it splits the file. I have spark.files.maxPartitionBytes 536870912 In my case, I have to work a lot with de novo assemblies, so Trinity RNA-Seq and transdecoder protein predictions are very common and they are often converted for ages by ADAM that often makes it almost unusable. Usually, Trinity has a lot of transcripts, while transdecoder adds very long descriptions to predicted sequences.

heuermh commented 6 years ago

The part where it splits the sequences up into fragments contributes to the issue. An alternative is to read each whole sequence using say biojava and then split and convert, but then everything is put into RAM on the driver, at least in how I've currently written it.

antonkulaga commented 6 years ago

As I have an urgent deadline I do not have time to figure out what is wrong in FastaConverter now. I suspect that:

val groupedContigs = keyedSequences.groupByKey()

may slow things down (have you considered substituting groupByKey with something else to reduce data shuffling?) and that the number of contigs is really large in my case because of de novo assembly.

@heuermh regarding your idea with biojava, do you mean this code:

 public SequenceRDD loadFastaProtein(final String path) throws IOException {
        log().info("Loading " + path + " in FASTA format as protein sequences...");
        try (InputStream inputStream = inputStream(path)) {
            JavaRDD<ProteinSequence> proteinSequences = javaSparkContext.parallelize(collect(FastaReaderHelper.readFastaProteinSequence(inputStream)));
            JavaRDD<Sequence> sequences = proteinSequences.map(proteinSequence -> proteinSequenceConverter.convert(proteinSequence, ConversionStringency.STRICT, log()));
            return SequenceRDD.apply(sequences.rdd());
        }
    }

where I should map results with this converter ( https://github.com/heuermh/biojava-adam/blob/master/src/main/java/org/biojava/nbio/adam/convert/ProteinSequenceToSequence.java ) and then save a parquet? As I understand, I have to give a lot of memory (what is the estimate for 500MB Trinity file, should 8GB be enough?) to spark-driver to execute that code?

heuermh commented 6 years ago

I suspect that keyedSequences.groupByKey() may slow things down

Yes, I also suspect that is true.

As I have an urgent deadline ...

That complicates things some. The biojava stuff depends on https://github.com/bigdatagenomics/adam/pull/1505, which won't be merged until after ADAM version 0.24.0, currently due Feb 2nd, 2018.

The method you quoted is for protein sequences, e.g.

>sp|Q6GZR6|059L_FRG3G Uncharacterized protein 059L OS=Frog virus 3 (isolate Goorha) GN=FV3-059L PE=4 SV=1
MPAQKRMSRYAYLVNKVAGPTLCGVFYGKYVEASDQAVSTCMAWFKIVVITKRVSAREWQ

A similar method exists for DNA sequences, loadFastaDna.

All the biojava sequences are collected to a single java.util.List which is then turned into an RDD via javaSparkContext.parallelize(.... Thus RAM on the driver is required. As for how much, it depends on overhead per biojava sequence compared to the fasta file. I'm a bit old school, so I am less sure what that looks like with biojava 4.x/5.x compared to biojava 1.x (biojava-legacy), with which I'm more familiar, and so https://github.com/heuermh/biojava-legacy-adam.

For your use case, with small transcript or short assembled sequences, most if not all of which would be less than the fragment size, it might help to add another loadFasta method that isn't clever about fragments and could avoid the shuffles.

antonkulaga commented 6 years ago

For your use case, with small transcript or short assembled sequences, most if not all of which would be less than the fragment size, it might help to add another loadFasta method that isn't clever about fragments and could avoid the shuffles.

@heuermh maybe it will also be useful for ADAM as well? So, users will have an option to choose. It is quite common to have files with a lot of short sequences where speed and memory consumption matters more than making sure that all fragments are <= than some length. Let them have an option to choose fast loading method.

heuermh commented 6 years ago

Yep, that is what I'm suggesting. Personally I'd rather that wait until after the Sequence/Slice stuff is merged in though.

I'll try to get #1505 up-to-date and rebased this weekend.

heuermh commented 6 years ago

What I'm seeing now is that the keyBy below appears to be stuck or stalled, it's been running for 16+ hours on only one executor

    // trim whitespace from FASTA lines
    val filtered = rdd.map(kv => (kv._1, kv._2.trim()))
    // and those that start with ;
      .filter((kv: (Long, String)) => !kv._2.startsWith(";"))

    // create a map of line number --> FastaDescriptionLine objects
    val descriptionLines: Map[Long, FastaDescriptionLine] = getDescriptionLines(filtered)

    // broadcast this map
    val indexToContigDescription = rdd.context.broadcast(descriptionLines)

    // filter to non-description lines
    val sequenceLines = filtered.filter(kv => !isDescriptionLine(kv._2))

    val keyedSequences =
      if (indexToContigDescription.value.isEmpty) {
        sequenceLines.keyBy(kv => -1L)
      } else {

       // key by highest line number with a description line below
       // the line number of our row
        sequenceLines.keyBy(row => findContigIndex(row._1, indexToContigDescription.value.keys.toList))
      }

https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/converters/FastaConverter.scala#L124

Is this what you see, @antonkulaga?

antonkulaga commented 6 years ago

@heuermh something like that. In my case, it is keyBy and then the first flatMap inside of it.

heuermh commented 6 years ago

Still going at 44 hours, at

org.apache.spark.rdd.RDD.keyBy(RDD.scala:1522)
org.bdgenomics.adam.converters.FastaSequenceConverter$.apply(FastaSequenceConverter.scala:116)