4paradigm / OpenMLDB

OpenMLDB is an open-source machine learning database that provides a feature platform computing consistent features for training and inference.
https://openmldb.ai
Apache License 2.0
1.58k stars 315 forks source link

Support UnsafeRowOpt for groupby agg physical node #1346

Open tobegit3hub opened 2 years ago

tobegit3hub commented 2 years ago

Now we can not enable UnsafeRowOpt for SQL with group by which may output incorrect result or crash because of C++ core.

Here is the simple case to reproduce.

  test("Test unsafe groupby") {
    val spark = getSparkSession
    val sess = new OpenmldbSession(spark)

    val data = Seq(
      Row(1, "tom", 100, 1),
      Row(2, "amy", 200, 2),
      Row(3, "tom", 300, 3),
      Row(4, "amy", 400, 4),
      Row(5, "tom", 500, 5),
      Row(6, "amy", 600, 6),
      Row(7, "tom", 700, 7),
      Row(8, "amy", 800, 8),
      Row(9, "tom", 900, 9),
      Row(10, "amy", 1000, 10))
    val schema = StructType(List(
      StructField("id", IntegerType),
      StructField("user", StringType),
      StructField("trans_amount", IntegerType),
      StructField("trans_time", IntegerType)))
    val df = spark.createDataFrame(spark.sparkContext.makeRDD(data), schema)

    sess.registerTable("t1", df)
    df.createOrReplaceTempView("t1")

    val sqlText = "SELECT max(id) AS max_id, sum(trans_amount) AS sum_amount FROM t1 GROUP BY user"
    // core for unaligned memory
    //val sqlText = "SELECT user, max(id) AS max_id, sum(trans_amount) AS sum_amount FROM t1 GROUP BY user"

    val outputDf = sess.sql(sqlText)
    outputDf.show()
  }
tobegit3hub commented 2 years ago

performance optimization for offline processing