apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT] why DatasetBulkInsertCommitActionExecutor save twice? #10237

Open zyclove opened 10 months ago

zyclove commented 10 months ago

A clear and concise description of the problem.

hudi bulk save data twice. image image

Steps to reproduce the behavior:

1.table config CREATE TABLE if NOT EXISTS bi_dw_real.smart_datapoint_report_rw_clear_rt( id STRING COMMENT 'id', uuid STRING COMMENT 'log uuid', data_id STRING COMMENT '', dev_id STRING COMMENT '', gw_id STRING COMMENT '', product_id STRING COMMENT '', uid STRING COMMENT '', dp_code STRING COMMENT '', dp_id STRING COMMENT '', dp_mode STRING COMMENT ', dp_name STRING COMMENT '', dp_time STRING COMMENT '', dp_type STRING COMMENT '', dp_value STRING COMMENT '', gmt_modified BIGINT COMMENT 'ct 时间', dt STRING COMMENT '时间分区字段' ) using hudi PARTITIONED BY (dt,dp_mode) COMMENT '' location '${bi_db_dir}/bi_ods_real/ods_smart_datapoint_report_rw_clear_rt' tblproperties ( type = 'mor', primaryKey = 'id', preCombineField = 'gmt_modified', hoodie.combine.before.upsert='false', hoodie.metadata.record.index.enable='true', hoodie.datasource.write.operation='upsert', hoodie.metadata.enable='true', hoodie.datasource.write.hive_style_partitioning='true', hoodie.metadata.record.index.min.filegroup.count ='512', hoodie.index.type='RECORD_INDEX', hoodie.compact.inline='false', hoodie.common.spillable.diskmap.type='ROCKS_DB', hoodie.datasource.write.partitionpath.field='dt,dp_mode', hoodie.compaction.payload.class='org.apache.hudi.common.model.PartialUpdateAvroPayload' ) ;

set hoodie.write.lock.zookeeper.lock_key=bi_ods_real.smart_datapoint_report_rw_clear_rt; set hoodie.storage.layout.type=DEFAULT; set hoodie.metadata.record.index.enable=true; set hoodie.metadata.enable=true; set hoodie.populate.meta.fields=false; set hoodie.parquet.compression.codec=snappy; set hoodie.memory.merge.max.size=2004857600000; set hoodie.write.buffer.limit.bytes=419430400; set hoodie.index.type=RECORD_INDEX; 2. set hoodie.sql.insert.mode=non-strict; set hoodie.sql.bulk.insert.enable=true;

  1. spark-sql bulk insert
  2. insert into bi_dw_real.dwd_smart_datapoint_report_rw_clear_rt

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

ad1happy2go commented 10 months ago

@zyclove It is not running save twice, but save is a spark action which is running in two stages.

zyclove commented 10 months ago

@zyclove It is not running save twice, but save is a spark action which is running in two stages.

So can this process be optimized? No need to shulffle twice.