hail-is / hail

Cloud-native genomic dataframes and batch computing
https://hail.is
MIT License
982 stars 246 forks source link

java.lang.OutOfMemoryError: GC overhead limit exceeded #4780

Closed jjfarrell closed 5 years ago

jjfarrell commented 6 years ago

To report a bug, fill in the information below. For support and feature requests, please use the discussion forum: http://discuss.hail.is/


Hail version:

version 0.2-721af83bc30a

What you did:

Import UK Biobank bgen chr10

import hail as hl import sys hl.init() chr=sys.argv[1] bgen="/project/ukbiobank/imp/uk.v3/bgen/ukb_imp_chr"+chr+"_v3.bgen" sample="/project/ukbiobank/imp/uk.v3/bgen/ukb19416_imp_chr"+chr+"_v3_s487327.sample" mt="/project/ukbiobank/imp/uk.v3/mt/ukbb_imp_chr"+chr+"_v3_s487327.mt" hl.index_bgen(bgen) hl.import_bgen(bgen,sample_file=sample,entry_fields=['GT', 'GP','dosage']).write(mt)

What went wrong (all error messages here, including the full java stack trace):

Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2-721af83bc30a
LOGGING: writing to /restricted/projectnb/ukbiobank/ad/analysis/ad.v1/hail-20181114-1827-0.2-721af83bc30a.log
Exception in thread "dispatcher-event-loop-8" Exception in thread "refresh progress" java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.util.zip.ZipCoder.getBytes(ZipCoder.java:80)
        at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
        at java.util.jar.JarFile.getEntry(JarFile.java:240)
        at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
        at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1042)
        at sun.misc.URLClassPath.getResource(URLClassPath.java:239)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.HeartbeatReceiver$$anonfun$org$apache$spark$HeartbeatReceiver$$expireDeadHosts$3.apply(HeartbeatReceiver.scala:198)
        at org.apache.spark.HeartbeatReceiver$$anonfun$org$apache$spark$HeartbeatReceiver$$expireDeadHosts$3.apply(HeartbeatReceiver.scala:196)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
        at org.apache.spark.HeartbeatReceiver.org$apache$spark$HeartbeatReceiver$$expireDeadHosts(HeartbeatReceiver.scala:196)
        at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1.applyOrElse(HeartbeatReceiver.scala:119)
        at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:105)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
        at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:216)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:170)
        at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:108)
        at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:108)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:108)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.SparkStatusTracker.getActiveStageIds(SparkStatusTracker.scala:61)
        at org.apache.spark.ui.ConsoleProgressBar.org$apache$spark$ui$ConsoleProgressBar$$refresh(ConsoleProgressBar.scala:67)
        at org.apache.spark.ui.ConsoleProgressBar$$anon$1.run(ConsoleProgressBar.scala:55)
        at java.util.TimerThread.mainLoop(Timer.java:555)
        at java.util.TimerThread.run(Timer.java:505)
---------------------------------------------------------------------------
FatalError                                Traceback (most recent call last)
/restricted/projectnb/ukbiobank/ad/analysis/ad.v1/bgen2mt.py in <module>
      6 sample="/project/ukbiobank/imp/uk.v3/bgen/ukb19416_imp_chr"+chr+"_v3_s487327.sample"
      7 mt="/project/ukbiobank/imp/uk.v3/mt/ukbb_imp_chr"+chr+"_v3_s487327.mt"
----> 8 hl.index_bgen(bgen)
      9 hl.import_bgen(bgen,sample_file=sample,entry_fields=['GT', 'GP','dosage']).write(mt)

<decorator-gen-1065> in index_bgen(path, index_file_map, reference_genome, contig_recoding, skip_invalid_loci)

/share/pkg/hail/2018-10-31/install/build/distributions/hail-python.zip/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    558     def wrapper(__original_func, *args, **kwargs):
    559         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 560         return __original_func(*args_, **kwargs_)
    561
    562     return wrapper

/share/pkg/hail/2018-10-31/install/build/distributions/hail-python.zip/hail/methods/impex.py in index_bgen(path, index_file_map, reference_genome, contig_recoding, skip_invalid_loci)
   1955         index_file_map = tdict(tstr, tstr)._convert_to_j(index_file_map)
   1956
-> 1957     Env.hc()._jhc.indexBgen(jindexed_seq_args(path), index_file_map, joption(rg), contig_recoding, skip_invalid_loci)
   1958
   1959

/share/pkg/spark/2.2.1/install/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134
   1135         for temp_arg in temp_args:

/share/pkg/hail/2018-10-31/install/build/distributions/hail-python.zip/hail/utils/java.py in deco(*args, **kwargs)
    208             raise FatalError('%s\n\nJava stack trace:\n%s\n'
    209                              'Hail version: %s\n'
--> 210                              'Error summary: %s' % (deepest, full, hail.__version__, deepest)) from None
    211         except pyspark.sql.utils.CapturedException as e:
    212             raise FatalError('%s\n\nJava stack trace:\n%s\n'

FatalError: OutOfMemoryError: GC overhead limit exceeded

Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded
        at scala.collection.immutable.VectorBuilder.<init>(Vector.scala:713)
        at scala.collection.immutable.Vector$.newBuilder(Vector.scala:22)
        at scala.collection.immutable.IndexedSeq$.newBuilder(IndexedSeq.scala:46)
        at scala.collection.IndexedSeq$.newBuilder(IndexedSeq.scala:36)
        at scala.collection.IndexedSeq$$anon$1.apply(IndexedSeq.scala:34)
        at com.twitter.chill.TraversableSerializer.read(Traversable.scala:39)
        at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:396)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:307)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
        at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:246)
        at org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:156)
        at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:188)
        at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:185)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
        at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:103)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
        at is.hail.io.bgen.IndexBgen$.apply(IndexBgen.scala:99)
        at is.hail.HailContext.indexBgen(HailContext.scala:374)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)

java.lang.OutOfMemoryError: GC overhead limit exceeded
        at scala.collection.immutable.VectorBuilder.<init>(Vector.scala:713)
        at scala.collection.immutable.Vector$.newBuilder(Vector.scala:22)
        at scala.collection.immutable.IndexedSeq$.newBuilder(IndexedSeq.scala:46)
        at scala.collection.IndexedSeq$.newBuilder(IndexedSeq.scala:36)
        at scala.collection.IndexedSeq$$anon$1.apply(IndexedSeq.scala:34)
        at com.twitter.chill.TraversableSerializer.read(Traversable.scala:39)
        at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:396)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:307)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
        at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:246)
        at org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:156)
        at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:188)
        at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:185)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
        at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:103)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

Hail version: 0.2-721af83bc30a
Error summary: OutOfMemoryError: GC overhead limit exceeded
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/share/pkg/spark/2.2.1/install/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1035, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/share/pkg/spark/2.2.1/install/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 883, in send_command
    response = connection.send_command(command)
  File "/share/pkg/spark/2.2.1/install/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1040, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
tpoterba commented 6 years ago

How much memory do your worker machines have?

jjfarrell commented 6 years ago

16 nodes with between 128GB to 256 GB

jjfarrell commented 5 years ago

The script is running fine on the smaller chromosome 19 to 22 bgen files so far. However, I noticed each were running just 24 cores even though we have 16 nodes * 16 cores each available on the cluster.

tpoterba commented 5 years ago

index_bgen can't be parallelized -- it needs to do a linear scan through the files to find variant byte offsets.

The rest should be parallelized just fine. Do you see only 24 cores working on the write?

I'd also note that BGEN is probably a much faster format than MT for representing the data in your script -- the MT will probably be ~10x bigger, since you're going to realize a bunch of stuff that can be computed from the BGEN dosages.

jjfarrell commented 5 years ago

I see what is happening.

The Hail cluster install instructions specify the following for a spark cluster:

export PYSPARK_SUBMIT_ARGS="\ --jars $HAIL_HOME/build/libs/hail-all-spark.jar \ --conf spark.driver.extraClassPath=\"$HAIL_HOME/build/libs/hail-all-spark.jar\" \ --conf spark.executor.extraClassPath=./hail-all-spark.jar \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.kryo.registrator=is.hail.kryo.HailKryoRegistrator pyspark-shell"

On our cluster, this will run as a local job. It needs a "--master yarn" for an argument. Running it locally probably is related to the out of memory error and the limited cores. I will rerun this with the --master yarn argument.

Regarding the bgen file versus matrix table, are you suggesting, it would be faster to run an analysis such as a logistic regression starting with the bgen file instead of the imported bgen mt file. The phenotypes would need to annotated the imported bgen mt every time. Just trying to understand the trade offs.

tpoterba commented 5 years ago

The phenotypes would need to annotated the imported bgen mt every time

This is very cheap, especially compared to the extra IO/decoding burden.

I should note, though, that in the next year we'll start to develop new types of file encodings that should let us represent this data as efficiently as the BGEN in a faster way (using a faster compression codec than zlib)

jjfarrell commented 5 years ago

To get around this running this, I increased the spark memory requested with the PYSPARK_SUBMIT_ARGS :.

--conf spark.driver.memory=5G\ --conf spark.executor.memory=30G\