apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.25k stars 2.39k forks source link

[SUPPORT] data skew when writing with bulk_insert + bucket_index enabled #11565

Open ziudu opened 3 weeks ago

ziudu commented 3 weeks ago

Describe the problem you faced

When I read a table (e.g. tb_transaction_detail) and write to another hudi table (e.g. tb_transaction_detail_bucket_index) with bulk insert and bucket index enabled, I noticed data skew during the stage "save at DatasetBulkInsertCommitActionExecutor.java:81" 。

=========================== tb_transaction_detail has 160 partitions, with 5 files in each partition. Data is evenly distributed, so each file is about 16.7 MB in size. Total size is about 13GB。

I read table tb_transaction_detail, and write to another hudi table tb_transaction_detail_bucket_index with bulk_insert and bucket index, I noticed:

  1. the stage "save at DatasetBulkInsertCommitActionExecutor.java:81" has 800 tasks. It is normal as input table has 800 files (deduced parallelism = 800) pic1

  2. Among those 800 tasks, only 224 tasks have data to process, while 576 tasks have nothing to do. pic3 Note: some tasks have only 20MB of data to process

  3. If sorting the tasks by "duration", I could see some tasks have 10 times more data to process (200MB+): pic4

However, data in the resulting table "tb_transaction_detail_bucket_index" is evenly distributed, with 160 partitions, each partition has 5 files, each file is about 16.67MB.

Is it normal to have skewed data during the write stage when bucket index is enabled? I'm expecting all 800 tasks should have some data to process. Also some tasks with largest "shuffle read size" could spill.

====================== To Reproduce

Steps to reproduce the behavior:

  1. Create a table, with for example, 160 partitions, each partition has 5 base files
  2. Read the table and write to another table, with bulk_insert, bucket index enabled. `# -- coding:utf8 -- from pyspark.sql import SparkSession

if name == 'main': spark = SparkSession.builder \ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ .config("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true") \ .config("spark.debug.maxToStringFields", "100") \ .enableHiveSupport().getOrCreate() spark.sparkContext.setLogLevel(logLevel="INFO")

path = "/ilw/test/16889087066964192324/hadoop/ods/research/transaction/mysql_10_43_10_140_3306_db_hudi_research" \
       "/tb_transaction_detail/hoodiedata"
df = spark.read.format("org.apache.hudi").load(path)
table_name = "tb_transaction_detail_bucket_index"
hudi_options = {
    "hoodie.table.name": table_name,
    "hoodie.datasource.write.table.name": table_name,
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.datasource.write.recordkey.field": "id,trans_code",  # "id,trans_code",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.datasource.write.operation": "bulk_insert",  # "bulk_insert",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.metadata.enable": 'true',
    "hoodie.metadata.index.bloom.filter.enable": 'false',
    "hoodie.metadata.index.column.stats.enable": 'false',
    "hoodie.metadata.record.index.enable": 'false',
    "hoodie.copyonwrite.record.size.estimate": 150,
    "hoodie.index.type": "BUCKET",
    "hoodie.bucket.index.num.buckets": "5",
    "hoodie.bucket.index.hash.field": "trans_code",
    'hoodie.datasource.write.partitionpath.field': 'trans_partition',
}
df.write.format("hudi") \
    .options(**hudi_options) \
    .mode("append") \
    .save("/ilw/test/16889087066964192324/hadoop/ods/research/transaction/"
          f"hudi_research/{table_name}")`

Environment: spark.sql.shuffle.partitions = 320

Expected behavior

All tasks during the stage "save at DatasetBulkInsertCommitActionExecutor.java:81" should have some data to process .

Environment Description

ziudu commented 3 weeks ago

simple index doesn't have similar problem. I think the root cause might be how bucket index is implemented.

danny0405 commented 3 weeks ago

Thanks for the feedback, @KnightChess and @beyond1920 , do you have some intreast to investigate the culprit?

ziudu commented 3 weeks ago

It seems if the table size is bigger, the data skew is worse. I noticed this issue when joining two tables and writing to a result table:

KnightChess commented 3 weeks ago

@ziudu "hoodie.bucket.index.hash.field": "trans_code" look like is enumeration value, can you determine the content of this field?

ziudu commented 3 weeks ago

trans_code is a randomly generated uuid string: e.g. cb1d7307e4e047989955ca544e175c71 in table tb_transaction_detail there are 96,500,000 records, with 96,500,000 distinct trans_code values. Each record has a unique trans_code value.

We found a workaround, if "hoodie.index.bucket.engine": "CONSISTENT_HASHING", there will be no more data skew or spill during the write stage. Consist_hashing is slower though: with simple bucket index it took 17-18 minutes to join and write, even with data skew, while with consist hashing it took 24 minutes to join and write the same data.

The parallelism during the write stage after join is 320 (spark.sql.shuffle.partitions) for simple bucket index. The parallelism during the write stage after join is 800 (number of parquet files in the resulting table) for consistent hashing bucket index.

KnightChess commented 3 weeks ago

@ziudu yes, you are right, the data skew when writing with bulk_insert + bucket_index enabled use simple bucket index, I will submit a pr to fix it.