databricks / spark-sql-perf

Apache License 2.0
582 stars 406 forks source link

The Query and Generate mismatch #185

Open william-wang opened 4 years ago

william-wang commented 4 years ago

I splited the Generate data and Query query in two jar files. Firstly generated data and then parallelly query the data. Most of tasks cost about 20ms. However some tasks cost 700ms. The reason is these taks access non exist key(files). Why the query task access non-generated files.

Following is a part of my program.

  1. Generate date code val tables = new TPCDSTables(sqlContext, dsdgenDir = "/opt/tpcds-kit/tools", // location of dsdgen scaleFactor = 70, useDoubleForDecimal = false, // true to replace DecimalType with DoubleType useStringForDate = false) // true to replace DateType with StringType

    tables.genData( location = rootDir, format = format, overwrite = true, // overwrite the data that is already there partitionTables = true, // create the partitioned fact tables clusterByPartitionColumns = true, // shuffle to get partitions coalesced into single files. filterOutNullPartitionValues = false, // true to filter out the partition with NULL key value tableFilter = "", // "" means generate all tables numPartitions = 100) // how many dsdgen part

  2. Query code val tables = new TPCDSTables(sqlContext, dsdgenDir = "/opt/tpcds-kit/tools", // location of dsdgen scaleFactor = 70, useDoubleForDecimal = false, // true to replace DecimalType with DoubleType useStringForDate = false) // true to replace DateType with StringType

    // Create the specified database sql(s"create database $databaseName")

    // Create metastore tables in a specified database for your data. // Once tables are created, the current database will be switched to the specified database. tables.createExternalTables(rootDir, "parquet", databaseName, overwrite = true, discoverPartitions = false) val tpcds = new TPCDS (sqlContext = sqlContext) // Set: val resultLocation = args(3) // place to write results val iterations = 1 // how many iterations of queries to run. //val queries = tpcds.tpcds2_4Queries // queries to run. val timeout = 246060 // timeout, in seconds.

    def queries = { if (args(4) == "all") { tpcds.tpcds2_4Queries } else { val qa = args(4).split(",", 0) val qs = qa.toSeq
    tpcds.tpcds2_4Queries.filter(q => { qs.contains(q.name) }) } }

    println(queries.size) sql(s"use $databaseName") val experiment = tpcds.runExperiment( queries, iterations = iterations, resultLocation = resultLocation, forkThread = true) experiment.waitForFinish(timeout)