verdict-project / verdict

Interactive-Speed Analytics: 200x Faster, 200x Fewer Cluster Resources, Approximate Query Processing
http://verdictdb.org
Apache License 2.0
248 stars 66 forks source link

count-distinct using scramble table is slower than using original table. Using verdictDB is much slower than using spark. #400

Open maxbeyond opened 2 years ago

maxbeyond commented 2 years ago

I want to count distinct for one column. I use 3 different methods: spark SQL, verdictDB on original table, verdictDB on the scramble table. (I make a few change to the Hello.scala example. The code is in the end.) And the results are:

spark time: 0.06 seconds verdictDB scramble time: 4.67 seconds verdictDB original time: 2.04 seconds verdictDB scramble time: 3.13 seconds

I don't know why verdictDB is even slower than original spark. Do you have any thoughts?

Also, there is some pull requests trying to support verdictDB on pyspark but they are not merged into master branch. I am wondering if there will be some support with pyspark.

Appreciate for your help!

package example

import org.apache.spark.sql.SparkSession
import org.verdictdb.VerdictContext
import org.verdictdb.connection.SparkConnection
import scala.util.Random
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType,StringType,StructType,StructField}

class MyTimer(val text_para: String) {
  var start_time = System.nanoTime()
  var text = text_para

  def stop() {
    val elapsed_time = (System.nanoTime() - start_time) / 1e9d
    printf("%s time: %.2f seconds\n", text, elapsed_time);
  }
}

object Hello extends App {
  val config = new SparkConf()
  config.set("spark.sql.storeAssignmentPolicy", "LEGACY")

  val spark = SparkSession
    .builder()
    .config(config)
    .appName("VerdictDB basic example")
    .enableHiveSupport()
    .getOrCreate()
  spark.sparkContext.setLogLevel("ERROR")

  import spark.implicits._
  val verdict = VerdictContext.fromSparkSession(spark)

  // prepare data
  prepareData(spark, verdict)

  val sqlDF = spark.sql("SELECT * FROM caida.sales")
  sqlDF.show(5)

  val spark_timer = new MyTimer("spark");
  val spark_rs = spark.sql("select groupby, count(distinct metric_value) from caida.sales_scramble GROUP BY groupby")
  spark_timer.stop()
  spark_rs.show()

val count_distinct_timer = new MyTimer("verdictDB scramble");
  val rs = verdict.sql("select groupby, count(distinct metric_value) from caida.sales_scramble GROUP BY groupby")
  count_distinct_timer.stop()
  // rs.show()

  verdict.sql("BYPASS DROP SCHEMA IF EXISTS verdictdbtemp CASCADE")
  val count_distinct_timer2 = new MyTimer("verdictDB original");
  val rs2 = verdict.sql("select groupby, count(distinct metric_value) from caida.sales GROUP BY groupby")
  count_distinct_timer2.stop()

  verdict.sql("BYPASS DROP SCHEMA IF EXISTS verdictdbtemp CASCADE")
  val count_distinct_timer3 = new MyTimer("verdictDB scramble");
  val rs3 = verdict.sql("select groupby, count(distinct metric_value) from caida.sales_scramble GROUP BY groupby")
  count_distinct_timer3.stop()

  def prepareData(spark: SparkSession, verdict: VerdictContext): Unit = {
    // create a schema and a table
    spark.sql("DROP SCHEMA IF EXISTS caida CASCADE")
    spark.sql("CREATE SCHEMA IF NOT EXISTS caida")
    spark.sql("CREATE TABLE IF NOT EXISTS caida.sales (groupby string, metric_value string)")

    verdict.sql("BYPASS DROP TABLE IF EXISTS caida.sales_scramble")
    verdict.sql("BYPASS DROP SCHEMA IF EXISTS verdictdbtemp CASCADE")
    verdict.sql("BYPASS DROP SCHEMA IF EXISTS verdictdbmeta CASCADE")

    val input_files = "s3://sketch-public/input/1m.csv"

    val caida_schema = StructType(Array(
        StructField("srcip", StringType, true),
        StructField("dstip", StringType, true),
        StructField("proto", StringType, true),
        StructField("srcport", StringType, true),
        StructField("dstport", StringType, true),
        StructField("length", StringType, true)
    ))

    val df = spark.read.format("csv")
          .option("sep", ",")
          .schema(caida_schema)
          .option("header", "false")
          .load(input_files)

    df.createOrReplaceTempView("dfView")

    spark.sql("INSERT INTO caida.sales (SELECT dstip as groupby, CONCAT(srcip, '|', srcport, '|', dstport, '|', length) as metric_value FROM dfView)")

    val scramble_timer = new MyTimer("scramble");
    verdict.sql("CREATE SCRAMBLE caida.sales_scramble FROM caida.sales METHOD HASH HASHCOLUMN metric_value")
    scramble_timer.stop()
  }
}