yueawang / oap-perf-suite

OAP Cluster Performance TestSuite
2 stars 5 forks source link

Add on-demand data gen. #13

Open yueawang opened 6 years ago

yueawang commented 6 years ago

We need to precisely control the data scale, format, column, and row group for more test purposes.

object OapBenchmarkOnDemandDataBuilder extends OapPerfSuiteContext with Logging {

private val defaultProperties = Map( "oap.benchmark.compression.codec" -> "gzip", "oap.benchmark.support.oap.version" -> "0.4.0", "oap.benchmark.tpcds.tool.dir" -> "/home/oap/tpcds-kit/tools", "oap.benchmark.hdfs.file.root.dir" -> "/user/oap/oaptest/", "oap.benchmark.database.prefix" -> "", "oap.benchmark.database.postfix" -> "", "oap.benchmark.tpcds.data.scale" -> "200", "oap.benchmark.tpcds.data.partition" -> "80" )

def getDatabase(format: String) : String = { val prefix = properties.get("oap.benchmark.database.prefix").get val postfix = properties.get("oap.benchmark.database.postfix").get val dataScale = properties.get("oap.benchmark.tpcds.data.scale").get.toInt val baseName = format match { case "oap" => s"oaptpcds$dataScale" case "parquet" => s"parquettpcds$dataScale" case _ => "default" }

prefix + baseName + postfix

}

def formatTableLocation(rootDir: String, versionNum: String, tableFormat: String): String = { s"${rootDir}/${versionNum}/ondemand/${getDatabase(tableFormat)}/" }

private val properties = { try { new mutable.HashMap[String, String]() ++= Utils.getPropertiesFromFile("./conf/oap-benchmark-default.conf") } catch { case e: IllegalArgumentException => { logWarning(e.getMessage + ". Use default setting!") defaultProperties } } }

override def beforeAll(conf: Map[String, String] = Map.empty): Unit = { super.beforeAll(conf) }

def generateTables(dataFormats: Array[String] = Array("oap", "parquet")): Unit = { val versionNum = properties.get("oap.benchmark.support.oap.version").get val codec = properties.get("oap.benchmark.compression.codec").get val scale = properties.get("oap.benchmark.tpcds.data.scale").get.toInt val partitions = properties.get("oap.benchmark.tpcds.data.partition").get.toInt val hdfsRootDir = properties.get("oap.benchmark.hdfs.file.root.dir").get val tpcdsToolPath = properties.get("oap.benchmark.tpcds.tool.dir").get

protected object testImplicits extends SQLImplicits {
  protected override def _sqlContext: SQLContext = spark.sqlContext
}
import testImplicits._

dataFormats.foreach{ format =>
  sqlContext.setConf(s"spark.sql.$format.compression.codec", codec)
  val loc = formatTableLocation(hdfsRootDir, versionNum, format)
  val constant = scale
  val data = spark.sparkContext.parallelize(1 to partitions, partitions).flatMap { i => {
    new Iterator[Long] {
            val rand = new java.util.Random(System.currentTimeMillis())
            var count = constant
            def hasNext = count > 0
            def next(): Long = {
              count -= 1
              rand.nextLong()
            }
          }
        }
      }
  data.toDF("value").write.mode("overwrite").format("oap").save(loc + "store_sales")
}

}

def generateDatabases() { // TODO: get from OapFileFormatConfigSet val dataFormats: Seq[String] = Seq("oap", "parquet") dataFormats.foreach { format => spark.sql(s"create database if not exists ${getDatabase(format)}") val versionNum = properties.get("oap.benchmark.support.oap.version").get val hdfsRootDir = properties.get("oap.benchmark.hdfs.file.root.dir").get val dataLocation = formatTableLocation(hdfsRootDir, versionNum, format)

  spark.sql(s"use ${getDatabase(format)}")
  spark.sql("drop table if exists store_sales")
  spark.sqlContext.createExternalTable("store_sales", dataLocation + "store_sales", format)
}

} }