apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

[SUPPORT]BaseDatasetBulkInsertCommitActionExecutor execute WriteStatus donot persist cause data write hudi 4 times #11741

Closed dongtingting closed 1 month ago

dongtingting commented 2 months ago

Describe the problem you faced

There is a job use bulk insert insert overwrite a cow table. We find there are 4 stage run bulk insert write, data write 4 times and only the last stage data remain, other 3 stage writen data is finally remove when finalize write.

all of the four stage on red line do bulk insert write.

image

more details about the four stage:

image image

This is because the four stage all use writestatus rdd, DatasetBulkInsertOverwriteCommitActionExecutor writeStatus rdd is not persist, this will case upstream rdd(bulk insert write) repeat running 4 times.

DatasetBulkInsertOverwriteCommitActionExecutor getPartitionToReplacedFileIds: use writestatus isEmpty DatasetBulkInsertOverwriteCommitActionExecutor getPartitionToReplacedFileIds: use writestatus distinct HoodieSparkSqlWriter.commitAndPerformPostOperations : use writestatus count HoodieSparkSqlWriter.commitAndPerformPostOperations :use writestatus collect

Upsert(do not use bulk insert) do not have this problem, because they persist writestatus. But BaseDatasetBulkInsertCommitActionExecutor do not persist writestatus. I think we should persist rdd at the beging of BaseDatasetBulkInsertCommitActionExecutor.buildHoodieWriteMetadata, does anyone agree?

image image

To Reproduce

Steps to reproduce the behavior:

  1. create a cow table test_table, using simple index

    create table if not exists test_table
    (
    id                                string     
    , name                             string    
    ,p_date                            string        comment '分区日期, yyyyMMdd'
    )USING hudi
     partitioned by (p_date)
      options (
      type='cow'
    );
  2. insert overwrite table use bulk insert

    
    set hoodie.datasource.write.operation=BULK_INSERT;
    set hoodie.bulkinsert.shuffle.parallelism=200;

insert overwrite test_table partition (p_date = '20240806') select id, name, p_date from source table


3. check spark job task  log, you will find isEmpty, all of distinct , count , collect  stage task  log have create marker and create handle log.

**Expected behavior**

A clear and concise description of what you expected to happen.

**Environment Description**

* Hudi version : 0.14.0

* Spark version : 2.4

* Hadoop version : 2.6
dongtingting commented 2 months ago

@danny0405 @beyond1920 can you help me to confirm?

danny0405 commented 2 months ago

@KnightChess maybe you can give some insights here, also cc @nsivabalan for visibility.

beyond1920 commented 2 months ago

@dongtingting Good catch. Thanks for fire this issue. It seems writestatus rdd does not persist. I would like to check this problem today, would reply later.

KnightChess commented 2 months ago

@dongtingting nice catch. we can persist writestatus avoid this issue.

dongtingting commented 2 months ago

@dongtingting nice catch. we can persist writestatus avoid this issue.

thanks very munch for your relay。 I am glad to fix it, later i will create a pr to fix it.

ad1happy2go commented 2 months ago

@dongtingting Thanks a lot. Let us know when you have PR ready.

Creating tracking JIRA for the same - https://issues.apache.org/jira/browse/HUDI-8078

dongtingting commented 2 months ago

@dongtingting Thanks a lot. Let us know when you have PR ready.

Creating tracking JIRA for the same - https://issues.apache.org/jira/browse/HUDI-8078

sorry for late reply. i have create a pr to fix: https://github.com/apache/hudi/pull/11811/files. cc @KnightChess