Closed jiezi2026 closed 2 years ago
@jiezi2026 Could you tell me how I can reproduce this bug? like using spark SQL or delta streamer?
In order to reproduce my problem, I made a small case. In our scenario, we first use sqoop to extract MySQL data into a hive table TEMP.temp_hudi_s1.Then use pyspark to load the data of TEMP.temp_hudi_s1 table into TEMP.temp_hudi_t1 table to complete data initialization.
-----------------------------------[sparksql]---------------------------- CREATE database if NOT EXISTS TEMP COMMENT 'temp' location 'hdfs://csbigdata/DATACENTER/TEMP/';
DROP TABLE if exists TEMP.temp_hudi_s1; CREATE TABLE if not exists TEMP.temp_hudi_s1 ( id bigint comment '地址ID', ship_no string comment '发货单号', created_date string comment '创建时间');
insert overwrite table TEMP.temp_hudi_s1 select 1 id,'FDP2203170007332' ship_no,'2022-03-17 01:44:15.0' created_date union select 2 id,'FDP2203170009040' ship_no,'2022-03-17 01:44:52.0' created_date union select 3 id,'FDP2203230005068' ship_no,'2022-03-23 21:35:11.0' created_date union select 4 id,'FDP2203250001605' ship_no,'2022-03-25 19:45:50.0' created_date union select 5 id,'FDP2203250009052' ship_no,'2022-03-25 19:46:11.0' created_date union select 6 id,'FDP2203280007475' ship_no,'2022-03-28 10:23:05.0' created_date union select 7 id,'FDP2203280003714' ship_no,'2022-03-28 16:46:52.0' created_date union select 8 id,'FDP2203280004322' ship_no,'2022-03-28 16:47:52.0' created_date union select 9 id,'FDP2203290007834' ship_no,'2022-03-29 09:40:13.0' created_date union select 10 id,'FDP2203290005863' ship_no,'2022-03-29 11:03:48.0' created_date
drop TABLE if exists TEMP.temp_hudi_t1; CREATE TABLE if not exists TEMP.temp_hudi_t1 ( id bigint comment '地址ID', ship_no string comment '发货单号', created_date string comment '创建时间', biprecombinets bigint COMMENT '预组合键', create_date string comment '创建日期' ) using hudi tblproperties (type = 'mor', primaryKey = 'id', preCombineField = 'biprecombinets') options ( "hoodie.table.keygenerator.class"="org.apache.hudi.keygen.ComplexKeyGenerator" ) COMMENT '发货地址表' PARTITIONED BY( create_date ) LOCATION 'hdfs://csbigdata/DATACENTER/TEMP/temp_hudi_t1'; -----------------------------------[sparksql]----------------------------
Next, start a pyspark client:
/opt/apache/SPARK/SPARK-CURRENT/bin/pyspark \ --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=/usr/bin/python3 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
---------------------------[pyspark]---------------------------- tableName = "temp_hudi_t1" basePath = "hdfs://csbigdata/DATACENTER/TEMP/temp_hudi_t1"
keys="id" target_hive_db="TEMP" target_hive_table="temp_hudi_t1"
partitionId="create_date" precombineField="biprecombinets"
source_sql="select \ , nvl(unix_timestamp(cast( created_date as string),'yyyy-MM-dd HH:mm:ss.S'),1 ) 1000 as biprecombinets \ , cast(to_date( created_date ) as string) create_date \ from TEMP.temp_hudi_s1 "
hive_source_data=spark.sql(source_sql);
hudi_options = { 'hoodie.table.name': tableName, 'hoodie.datasource.write.recordkey.field': keys, 'hoodie.datasource.write.keygenerator.class':'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.datasource.write.partitionpath.field': partitionId, 'hoodie.parquet.writelegacyformat.enabled': 'true', 'hoodie.datasource.write.table.name': tableName, 'hoodie.table.timeline.timezone': 'LOCAL', 'hoodie.datasource.write.operation': 'BULK_INSERT', 'hoodie.bulkinsert.sort.mode': 'PARTITION_SORT', 'hoodie.bulkinsert.user.defined.partitioner.sort.columns':partitionId, 'hoodie.datasource.write.precombine.field': precombineField, 'hoodie.bulkinsert.shuffle.parallelism': 200, 'hoodie.upsert.shuffle.parallelism': 200, 'hoodie.insert.shuffle.parallelism': 200, 'hoodie.index.type': 'BLOOM', 'hoodie.datasource.write.hive_style_partitioning':'true', 'hoodie.clean.automatic':'true', 'hoodie.cleaner.policy':'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained':'10', 'hoodie.archive.merge.enable':'true', 'hoodie.archive.automatic':'true', 'hoodie.archive.merge.files.batch.size':'10', 'archive.min_commits':'20', 'archive.max_commits':'30', 'hoodie.keep.min.commits':'20', 'hoodie.keep.max.commits':'30' }
HUDI_SOURCE_COUNT=hive_source_data.count() print("HUDI_SOURCE_COUNT:"+str(HUDI_SOURCE_COUNT)) hive_source_data.write.format("hudi") \ .options(**hudi_options) \ .mode("append") \ .save(basePath)
---------------------------[pyspark]----------------------------
hdfs dfs -ls hdfs://csbigdata/DATACENTER/TEMP/temp_hudi_t1
It is normal to test the following sparksql.
set hoodie.sql.bulk.insert.enable=true; set hoodie.sql.insert.mode=non-strict; INSERT into TEMP.temp_hudi_t1 select id,ship_no,created_date, nvl(unix_timestamp(cast( created_date as string),'yyyy-MM-dd HH:mm:ss.S'),1 ) * 1000 as biprecombinets , cast(to_date( created_date ) as string) create_date from TEMP.temp_hudi_s1
@fengjian428
Yeah, we got the same issue when using bulk_insert in 0.11, and there is no problem in 0.10.1
@jiezi2026 you can try 0.10.1 if you are urgent
@TengHuo Thanks, I'm trying to replace my code with sparksql.
@TengHuo is working on this and will create a pr to fix this issue. Thanks
Submitted a PR for fixing this issue: https://github.com/apache/hudi/pull/6085
And this issue is related with a merged PR: https://github.com/apache/hudi/pull/5462 and a pending PR: https://github.com/apache/hudi/pull/5470
More detail in ticket https://issues.apache.org/jira/browse/HUDI-4384
@TengHuo this is immediately addressed in https://github.com/apache/hudi/pull/6049 and will be properly addressed in the fix for HUDI-3993
@alexeykudinkin Cool, thanks for remind. np, I will close PR #6085 if #6049 is merged
In hudi-0.11.1, when I use spark 3.2.1 for data initialization (bulkinsert mode),'hoodie datasource. write. hive style_ Partitioning':'true' does not take effect. But it works in upsert mode.