disq-bio / disq

A library for manipulating bioinformatics sequencing formats in Apache Spark
MIT License
31 stars 11 forks source link

StackOverflowError when saving to BAM file in spark-shell #116

Closed heuermh closed 3 years ago

heuermh commented 5 years ago

For benchmarking I've built a fat jar with ADAM and Disq, and there seems to be a problem with SAMRecord or SAMFileHeader serialization.

$ adam-shell -i convert_parquet_alignments_disq_adam.scala
...
java.lang.StackOverflowError
  at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:284)
  at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:102)
  at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:540)
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:75)
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
...

convert_parquet_alignments_disq_adam.scala

import org.bdgenomics.adam.rdd.ADAMContext._
import org.disq_bio.disq.HtsjdkReadsRdd
import org.disq_bio.disq.HtsjdkReadsRddStorage
import org.disq_bio.disq.FileCardinalityWriteOption

val alignments = sc.loadParquetAlignments(inputPath.get)
val (reads, header) = alignments.convertToSam()

val htsjdkReadsRddStorage = HtsjdkReadsRddStorage.makeDefault(sc)
val htsjdkReadsRdd = new HtsjdkReadsRdd(header, reads.map(_.get()).toJavaRDD)
htsjdkReadsRddStorage.write(htsjdkReadsRdd, outputPath.get, FileCardinalityWriteOption.SINGLE)

See also a similar issue https://github.com/bigdatagenomics/adam/issues/2186 reported in ADAM, when saving to BAM format from ADAM code. I fear a conflict in serialization registration or other incompatibilities between the two libraries' use of htsjdk.

heuermh commented 5 years ago

After further investigation, this might be a Java-called-from-Scala problem, where Kryo attempts to wrap a Java collection with a Scala one

com.esotericsoftware.kryo.KryoException: Max depth exceeded: 32
Serialization trace:
head (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
tl (scala.collection.immutable.$colon$colon)
underlying (scala.collection.convert.Wrappers$SeqWrapper)
mSequences (htsjdk.samtools.SAMSequenceDictionary)
mSequenceDictionary (htsjdk.samtools.SAMFileHeader)
  at com.esotericsoftware.kryo.Kryo.beginObject(Kryo.java:1012)
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:568)
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
...
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
...
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
  at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:241)
  at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:291)
  at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:291)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:292)
  at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
  at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
  at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
  at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
  at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
  at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:650)
  at org.disq_bio.disq.impl.formats.bam.BamSink.save(BamSink.java:78)
  at org.disq_bio.disq.HtsjdkReadsRddStorage.write(HtsjdkReadsRddStorage.java:227)
  ... 65 elided
heuermh commented 5 years ago

This is a temporary workaround, to use Java serialization instead of Kryo

import com.esotericsoftware.kryo.serializers.JavaSerializer
...
kryo.register(classOf[htsjdk.samtools.SAMFileHeader], new JavaSerializer())
heuermh commented 3 years ago

Closing as unable to reproduce