Huawei-Spark / Spark-SQL-on-HBase

Native, optimized access to HBase Data through Spark SQL/Dataframe Interfaces
Apache License 2.0
321 stars 164 forks source link

Avoid infinite loop in case a equality filter for a key is requested from a query. Query hangs otherwise. #12

Closed shreyanstyagi closed 8 years ago

shreyanstyagi commented 8 years ago

Issue :

Unable to run the following query: select key1, key2, ts, col from table where key1='abc' and key2='3' and ts >= 1432015200 and ts < 1432029600

Here, key1, key2, key3 and ts are part of composite key and col is a column. key1 and key2 are distributed across multiple regions.

Below is the thread dump:

"dag-scheduler-event-loop" #69 daemon prio=5 os_prio=0 tid=0x00007f8cfc73b000 nid=0x209c runnable [0x00007f8cde8e9000] java.lang.Thread.State: RUNNABLE at org.apache.spark.sql.hbase.RangeCriticalPoint$.binarySearchEquality(HBaseCriticalPoint.scala:495) at org.apache.spark.sql.hbase.RangeCriticalPoint$.binarySearchForTightBound(HBaseCriticalPoint.scala:562) at org.apache.spark.sql.hbase.RangeCriticalPoint$.getQualifiedPartitions(HBaseCriticalPoint.scala:599) at org.apache.spark.sql.hbase.RangeCriticalPoint$.prunePartitions(HBaseCriticalPoint.scala:651) at org.apache.spark.sql.hbase.RangeCriticalPoint$.generatePrunedPartitions(HBaseCriticalPoint.scala:712) at org.apache.spark.sql.hbase.HBaseSQLReaderRDD.getPartitions(HBaseSQLReaderRDD.scala:303) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.ShuffleDependency.(Dependency.scala:82) at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:206) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:204) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:204) at org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:342) at org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:359) at org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:321) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:222) at org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:305) at org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:302) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:302) at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:314) at org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:248) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1361) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1353) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Fix: Modified the conditions which were causing the infinite loop and resulting in query hang.