StarRocks / demo

Apache License 2.0
83 stars 57 forks source link

taobao data set (1GB+ parquet file + JOINS) instruction on Hudi #53

Closed alberttwong closed 7 months ago

alberttwong commented 7 months ago

run in the spark container.

bash
rm -f /spark-3.2.1-bin-hadoop3.2/jars/hudi-spark3-bundle_2.12-0.11.1.jar
export SPARK_VERSION=3.2
spark-shell --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.14.1 --driver-memory 24G

run in the scala prompt

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import scala.collection.JavaConversions._

val df = spark.read.parquet("s3a://warehouse/user_behavior_sample_data.parquet")

val databaseName = "hudi_ecommerce"
val tableName = "user_behavior"
val basePath = "s3a://huditest/hudi_ecommerce_user_behavior"

df.write.format("hudi").
  option(org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME, tableName).
  option("hoodie.datasource.write.operation", "bulk_insert").
  option("hoodie.datasource.hive_sync.enable", "true").
  option("hoodie.datasource.hive_sync.mode", "hms").
  option("hoodie.datasource.hive_sync.database", databaseName).
  option("hoodie.datasource.hive_sync.table", tableName).
  option("hoodie.datasource.hive_sync.metastore.uris", "thrift://hive-metastore:9083").
  option("fs.defaultFS", "s3://huditest/").  
  mode(Overwrite).
  save(basePath)

run in the scala prompt

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import scala.collection.JavaConversions._

val df = spark.read.parquet("s3a://warehouse/item_sample_data.parquet")

val databaseName = "hudi_ecommerce"
val tableName = "item"
val basePath = "s3a://huditest/hudi_ecommerce_item"

df.write.format("hudi").
  option(org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME, tableName).
  option("hoodie.datasource.write.operation", "bulk_insert").
  option("hoodie.datasource.hive_sync.enable", "true").
  option("hoodie.datasource.hive_sync.mode", "hms").
  option("hoodie.datasource.hive_sync.database", databaseName).
  option("hoodie.datasource.hive_sync.table", tableName).
  option("hoodie.datasource.hive_sync.metastore.uris", "thrift://hive-metastore:9083").
  option("fs.defaultFS", "s3://huditest/").  
  mode(Overwrite).
  save(basePath)

run in the mysql client

CREATE EXTERNAL CATALOG hudi_catalog_hms
PROPERTIES
(
    "type" = "hudi",
    "hive.metastore.type" = "hive",
    "hive.metastore.uris" = "thrift://hive-metastore:9083",
    "aws.s3.use_instance_profile" = "false",
    "aws.s3.access_key" = "admin",
    "aws.s3.secret_key" = "password",
    "aws.s3.region" = "us-east-1",
    "aws.s3.enable_ssl" = "false",
    "aws.s3.enable_path_style_access" = "true",
    "aws.s3.endpoint" = "http://minio:9000"
);
set catalog hudi_catalog_hms;
show databases;
use hudi_ecommerce;
show tables;
select count(*) from user_behavior;
select count(*) from item;

optional to run in the mysql prompt

drop catalog hudi_catalog_hms;

3 sql exercises at https://forum.starrocks.io/t/retail-ecommerce-funnel-analysis-demo-with-1-million-members-and-87-million-record-dataset-using-starrocks/269

alberttwong commented 7 months ago

question 2 with JOINS

with tmp1 as (
  with tmp as (
    select 
      ItemID, 
      t.level as level, 
      count(UserID) as res 
    from 
      (
        select 
          ItemID, 
          UserID, 
          window_funnel(
            1800, 
            timestamp, 
            0, 
            [BehaviorType = 'pv', 
            BehaviorType ='buy' ]
          ) as level 
        from 
          user_behavior 
        where timestamp >= '2017-12-02 00:00:00' 
            and timestamp <= '2017-12-02 23:59:59'
        group by 
          ItemID, 
          UserID
      ) as t 
    where 
      t.level > 0 
    group by 
      t.ItemID, 
      t.level 
  ) 
  select 
    tmp.ItemID, 
    tmp.level, 
    sum(tmp.res) over (
      partition by tmp.ItemID 
      order by 
        tmp.level rows between current row 
        and unbounded following
    ) as retention 
  from 
    tmp
) 
select 
  tmp1.ItemID, 
  i.name,
  tmp1.level, 
  tmp1.retention / last_value(tmp1.retention) over(
    partition by tmp1.ItemID 
    order by 
      tmp1.level desc rows between current row 
      and 1 following
  ) as retention_ratio 
from 
  tmp1 
JOIN item i ON tmp1.ItemID = i.ItemID
order by 
  tmp1.level desc, 
  retention_ratio 
limit 
  10;

output

+---------+--------------+-------+-----------------------+
| ItemID  | name         | level | retention_ratio       |
+---------+--------------+-------+-----------------------+
|   59883 | item 59883   |     2 | 0.0003616636528028933 |
|  394978 | item 394978  |     2 | 0.0006357279084551812 |
| 1164931 | item 1164931 |     2 | 0.0006648936170212766 |
| 4622270 | item 4622270 |     2 | 0.0007692307692307692 |
|  812879 | item 812879  |     2 | 0.0009121313469139556 |
| 1783990 | item 1783990 |     2 | 0.0009132420091324201 |
| 3847054 | item 3847054 |     2 |  0.000925925925925926 |
| 2742138 | item 2742138 |     2 | 0.0009881422924901185 |
|  530918 | item 530918  |     2 | 0.0010193679918450561 |
|  600756 | item 600756  |     2 | 0.0010319917440660474 |
+---------+--------------+-------+-----------------------+
10 rows in set (5.07 sec)
alberttwong commented 7 months ago

https://github.com/StarRocks/demo/pull/51