bigdatagenomics / adam

ADAM is a genomics analysis platform with specialized file formats built using Apache Avro, Apache Spark, and Apache Parquet. Apache 2 licensed.
Apache License 2.0
996 stars 309 forks source link

Inconsistent counts for shuffleRegionJoin without Sequence Dictionaries #1956

Open heuermh opened 6 years ago

heuermh commented 6 years ago

Shuffle region join between two GFF3 files, downloaded from ftp://ftp.ensembl.org/pub/release-91/gff3/homo_sapiens/ and ftp://ftp.ensembl.org/pub/release-91/regulation/homo_sapiens/, read from HDFS with and without providing a sequence dictionary:

$ adam-shell ...

scala> import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMContext._

scala> val genes = sc.loadFeatures("Homo_sapiens.GRCh38.91.gff3")
genes: org.bdgenomics.adam.rdd.feature.FeatureRDD = RDDBoundFeatureRDD with 0 reference sequences

scala> val reg = sc.loadFeatures("homo_sapiens.GRCh38.Regulatory_Build.regulatory_features.20161111.gff")
reg: org.bdgenomics.adam.rdd.feature.FeatureRDD = RDDBoundFeatureRDD with 0 reference sequences

scala> for (i <- 1 to 10)
  println("attempt=" + i + " count=" + genes.shuffleRegionJoin(reg).rdd.count())
attempt=1 count=423993                                                          
attempt=2 count=423993                                                          
attempt=3 count=423993                                                          
attempt=4 count=433233                                                          
attempt=5 count=423993                                                          
attempt=6 count=433233                                                          
attempt=7 count=423993                                                          
attempt=8 count=433233                                                          
attempt=9 count=423993                                                          
attempt=10 count=433233        

scala> val sd = sc.loadSequenceDictionary("Homo_sapiens.GRCh38.91.gff3.genome")
sd: org.bdgenomics.adam.models.SequenceDictionary =
SequenceDictionary{
1->248956422
...
KI270312.1->998
KI270315.1->...

scala> val genesWithSequenceDictionary = sc.loadFeatures("Homo_sapiens.GRCh38.91.gff3", optSequenceDictionary = Some(sd))
genesWithSequenceDictionary: org.bdgenomics.adam.rdd.feature.FeatureRDD = RDDBoundFeatureRDD with 194 reference sequences

scala> val regWithSequenceDictionary = sc.loadFeatures("homo_sapiens.GRCh38.Regulatory_Build.regulatory_features.20161111.gff", optSequenceDictionary = Some(sd))
regWithSequenceDictionary: org.bdgenomics.adam.rdd.feature.FeatureRDD = RDDBoundFeatureRDD with 194 reference sequences                                                 

scala> for (i <- 1 to 10)
  println("attempt=" + i + " count=" +
  genesWithSequenceDictionary.shuffleRegionJoin(regWithSequenceDictionary).rdd.count())
attempt=1 count=2208253                                                         
attempt=2 count=2208253                                                         
attempt=3 count=2208253                                                         
attempt=4 count=2208253                                                         
attempt=5 count=2208253                                                         
attempt=6 count=2208253                                                         
attempt=7 count=2208253                                                         
attempt=8 count=2208253                                                         
attempt=9 count=2208253                                                         
attempt=10 count=2208253 
devin-petersohn commented 6 years ago

Unless I'm missing something, I can't see where in the ShuffleRegionJoin code that a SequenceDictionary is used. It's not used for partitioning, the join, or anything except to pass to the constructor of the new RDD.

heuermh commented 6 years ago

Would strand matter wrt sequence dictionary or no sequence dictionary? And could that somehow be nondeterministic?

scala> val genes = sc.loadFeatures("Homo_sapiens.GRCh38.91.gff3")
genes: org.bdgenomics.adam.rdd.feature.FeatureRDD = RDDBoundFeatureRDD with 0 reference sequences

scala> val reg = sc.loadFeatures("homo_sapiens.GRCh38.Regulatory_Build.regulatory_features.20161111.gff")
reg: org.bdgenomics.adam.rdd.feature.FeatureRDD = RDDBoundFeatureRDD with 0 reference sequences

scala> for (i <- 1 to 10) println("attempt=" + i + " count=" +
  genes.shuffleRegionJoin(reg).rdd.count())
attempt=1 count=423993                                                          
attempt=2 count=433233                                                          
attempt=3 count=433233                                                          
attempt=4 count=423993                                                          
attempt=5 count=433233                                                          
attempt=6 count=433233                                                          
attempt=7 count=433233                                                          
attempt=8 count=423993                                                          
attempt=9 count=423993                                                          
attempt=10 count=423993                                                         

scala> val join = genes.shuffleRegionJoin(reg)
join: org.bdgenomics.adam.rdd.GenericGenomicDataset[(org.bdgenomics.formats.avro.Feature, org.bdgenomics.formats.avro.Feature),(org.bdgenomics.adam.sql.Feature, org.bdgenomics.adam.sql.Feature)] = RDDBoundGenericGenomicDataset with 0 reference sequences

scala> join.rdd.count()
res2: Long = 433233                                                             

scala> join.rdd.count()
res3: Long = 433233                                                             

scala> val join2 = genes.shuffleRegionJoin(reg)
join2: org.bdgenomics.adam.rdd.GenericGenomicDataset[(org.bdgenomics.formats.avro.Feature, org.bdgenomics.formats.avro.Feature),(org.bdgenomics.adam.sql.Feature, org.bdgenomics.adam.sql.Feature)] = RDDBoundGenericGenomicDataset with 0 reference sequences

scala> join2.rdd.count()
res4: Long = 433233                                                             

scala> join2.rdd.count()
res5: Long = 433233                                                             

scala> val join3 = genes.shuffleRegionJoin(reg)
join3: org.bdgenomics.adam.rdd.GenericGenomicDataset[(org.bdgenomics.formats.avro.Feature, org.bdgenomics.formats.avro.Feature),(org.bdgenomics.adam.sql.Feature, org.bdgenomics.adam.sql.Feature)] = RDDBoundGenericGenomicDataset with 0 reference sequences

scala> join3.rdd.count()
res6: Long = 423993                                                             

scala> val difference = join2.rdd.subtract(join3.rdd)
difference: org.apache.spark.rdd.RDD[(org.bdgenomics.formats.avro.Feature, org.bdgenomics.formats.avro.Feature)] = MapPartitionsRDD[491] at subtract at <console>:38

scala> difference.count()
res7: Long = 87631                                                              

scala> Array(difference.first._1.getContigName(), difference.first._1.getStart(), difference.first._1.getEnd(), difference.first._1.getStrand(), "-->", difference.first._2.getContigName(), difference.first._2.getStart(), difference.first._2.getEnd(), difference.first._2.getStrand()).mkString("\t")
16  7332749 7712504 FORWARD --> 16  7615538 7616217 INDEPENDENT
heuermh commented 3 years ago

Will attempt to reproduce on a recent release version