Huawei-Spark / Spark-SQL-on-HBase

Native, optimized access to HBase Data through Spark SQL/Dataframe Interfaces
Apache License 2.0
321 stars 164 forks source link

1 #9

Closed Zhangshunyu closed 9 years ago

Zhangshunyu commented 9 years ago

//变化原因在于0.98和1.0.0接口变化,如下: //=========================== //第一:社区0.98代码地址: //https://github.com/apache/hbase/blob/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java //社区0.98InternalScanner.java下boolean next(List result, int limit) throws IOException; //社区0.98RegionScanner.java下:没有int getBatch(),且boolean nextRaw(List result, int limit) throws IOException; //===========================

//=========================== //第三:社区branch1代码地址:(master代码也一样) //https://github.com/apache/hbase/blob/branch-1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java //社区branch1中InternalScanner.java下:boolean next(List result, ScannerContext scannerContext) throws IOException; //社区branch1中RegionScanner.java下:int getBatch()以及boolean nextRaw(List result, ScannerContext scannerContext) throws IOException; //=============================

package org.apache.spark.sql.hbase

import org.apache.hadoop.hbase. import org.apache.hadoop.hbase.client. import org.apache.hadoop.hbase.coprocessor. import org.apache.hadoop.hbase.regionserver. import org.apache.hadoop.hbase.util.Bytes import org.apache.log4j.Logger import org.apache.spark. import org.apache.spark.executor.TaskMetrics import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions. import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate import org.apache.spark.sql.hbase.util.DataTypeUtils import org.apache.spark.sql.types._ import org.apache.spark.sql.{Row, SQLContext}

/**

abstract class BaseRegionScanner extends RegionScanner { override def getBatch={0} //实现这个从接口中继承的函数 //新版本的hbase中在RegionScanner中添加了一个这样一个函数:int getBatch(); //但是这个函数在Astro继承过来之后没有用,而实例化后面的类的时候又不得不实现继承过来的函数 //所以仅是实现它防止编译报错,在Astro中并没有作用。 override def isFilterDone = false

override def next(result: java.util.List[Cell], scannerContext: ScannerContext)= next(result)//这里把limit: Int=>scannerContext: ScannerContext //next函数上层继承自hbase的RegionScanner,再上层继承自InternalScanner //在0.98版本中next函数boolean next(List result, int limit) throws IOException; //在新版本中boolean next(List result, ScannerContext scannerContext) throws IOException;

override def reseek(row: Array[Byte]) = throw new DoNotRetryIOException("Unsupported")

override def getMvccReadPoint = Long.MaxValue

override def nextRaw(result: java.util.List[Cell]) = next(result)

override def nextRaw(result: java.util.List[Cell], scannerContext: ScannerContext) = next(result, scannerContext)//这里把limit: Int=>scannerContext: ScannerContext //这里对比两个版本HBase中RegionScanner的区别: //0.98版HBase这里定义为boolean nextRaw(List result, int limit) throws IOException; //而新版这里定义为 boolean nextRaw(List result, ScannerContext scannerContext) throws IOException; }

class SparkSqlRegionObserver extends BaseRegionObserver { lazy val logger = Logger.getLogger(getClass.getName) lazy val EmptyArray = Array[Byte]()

override def postScannerOpen(e: ObserverContext[RegionCoprocessorEnvironment], scan: Scan, s: RegionScanner) = { val serializedPartitionIndex = scan.getAttribute(CoprocessorConstants.COINDEX) if (serializedPartitionIndex == null) { logger.debug("Work without coprocessor") super.postScannerOpen(e, scan, s) } else { logger.debug("Work with coprocessor") val partitionIndex: Int = Bytes.toInt(serializedPartitionIndex) val serializedOutputDataType = scan.getAttribute(CoprocessorConstants.COTYPE) val outputDataType: Seq[DataType] = HBaseSerializer.deserialize(serializedOutputDataType).asInstanceOf[Seq[DataType]]

  val serializedRDD = scan.getAttribute(CoprocessorConstants.COKEY)
  val subPlanRDD: RDD[Row] = HBaseSerializer.deserialize(serializedRDD).asInstanceOf[RDD[Row]]

  val taskParaInfo = scan.getAttribute(CoprocessorConstants.COTASK)
  val (stageId, partitionId, taskAttemptId, attemptNumber) =
    HBaseSerializer.deserialize(taskParaInfo).asInstanceOf[(Int, Int, Long, Int)]
  val taskContext = new TaskContextImpl(
    stageId, partitionId, taskAttemptId, attemptNumber, null, false, new TaskMetrics)

  val regionInfo = s.getRegionInfo
  val startKey = if (regionInfo.getStartKey.isEmpty) None else Some(regionInfo.getStartKey)
  val endKey = if (regionInfo.getEndKey.isEmpty) None else Some(regionInfo.getEndKey)

  val result = subPlanRDD.compute(
    new HBasePartition(partitionIndex, partitionIndex, startKey, endKey, newScanner = s),
    taskContext)

  new BaseRegionScanner() {
    override def getRegionInfo: HRegionInfo = regionInfo

    override def getMaxResultSize: Long = s.getMaxResultSize

    override def close(): Unit = s.close()

    override def next(results: java.util.List[Cell]): Boolean = {
      val hasMore: Boolean = result.hasNext
      if (hasMore) {
        val nextRow = result.next()
        val numOfCells = outputDataType.length
        for (i <- 0 until numOfCells) {
          val data = nextRow(i)
          val dataType = outputDataType(i)
          val dataOfBytes: HBaseRawType = {
            if (data == null) null else DataTypeUtils.dataToBytes(data, dataType)
          }
          results.add(new KeyValue(EmptyArray, EmptyArray, EmptyArray, dataOfBytes))
        }
      }
      hasMore
    }
  }
}

} }