apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.42k forks source link

[SUPPORT] using Flink to write to Hudi in upsert mode and syncing to Hive, querying the external table in Hive gives an error:Caused by: org.apache.hudi.org.apache.avro.AvroRuntimeException: Duplicate field _hoodie_commit_time in record flink_schema: _hoodie_commit_time type:UNION pos:13 and _hoodie_commit_time type:UNION pos:0. #10779

Open Toroidals opened 6 months ago

Toroidals commented 6 months ago

Tips before filing an issue

Describe the problem you faced using Flink to write to Hudi in upsert mode and syncing to Hive, querying the external table in Hive gives an error: ], TaskAttempt 2 failed, info=[Error: Error while running task ( failure ) : attempt_1706375932152_272353_2_00_000201_2:java.lang.RuntimeException: java.lang.RuntimeException: java.io.IOException: org.apache.hudi.org.apache.avro.AvroRuntimeException: Duplicate field _hoodie_commit_time in record flink_schema: _hoodie_commit_time type:UNION pos:13 and _hoodie_commit_time type:UNION pos:0. at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:296) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250) at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:381) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:82) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:69) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:69) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:39) at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131) at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:74) at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: java.io.IOException: org.apache.hudi.org.apache.avro.AvroRuntimeException: Duplicate field _hoodie_commit_time in record flink_schema: _hoodie_commit_time type:UNION pos:13 and _hoodie_commit_time type:UNION pos:0. at org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.initNextRecordReader(TezGroupedSplitsInputFormat.java:199) at org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.next(TezGroupedSplitsInputFormat.java:145) at org.apache.tez.mapreduce.lib.MRReaderMapred.next(MRReaderMapred.java:116) at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68) at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:419) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:267) ... 16 more Caused by: java.io.IOException: org.apache.hudi.org.apache.avro.AvroRuntimeException: Duplicate field _hoodie_commit_time in record flink_schema: _hoodie_commit_time type:UNION pos:13 and _hoodie_commit_time type:UNION pos:0. at org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderCreationException(HiveIOExceptionHandlerChain.java:97) at org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(HiveIOExceptionHandlerUtil.java:57) at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:420) at org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.initNextRecordReader(TezGroupedSplitsInputFormat.java:196) ... 21 more Caused by: org.apache.hudi.org.apache.avro.AvroRuntimeException: Duplicate field _hoodie_commit_time in record flink_schema: _hoodie_commit_time type:UNION pos:13 and _hoodie_commit_time type:UNION pos:0. at org.apache.hudi.org.apache.avro.Schema$RecordSchema.setFields(Schema.java:651) at org.apache.hudi.avro.HoodieAvroUtils.generateProjectionSchema(HoodieAvroUtils.java:534) at org.apache.hudi.hadoop.avro.HoodieAvroParquetReader.(HoodieAvroParquetReader.java:63) at org.apache.hudi.hadoop.avro.HoodieTimestampAwareParquetInputFormat.createRecordReader(HoodieTimestampAwareParquetInputFormat.java:42) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:94) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:60) at org.apache.hudi.hadoop.HoodieParquetInputFormat.getRecordReaderInternal(HoodieParquetInputFormat.java:129) at org.apache.hudi.hadoop.HoodieParquetInputFormat.getRecordReader(HoodieParquetInputFormat.java:121) at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:417) ... 22 more ], TaskAttempt 3 failed, info=[Error: Error while running task ( failure ) : attempt_1706375932152_272353_2_00_000201_3:java.lang.RuntimeException: java.lang.RuntimeException: java.io.IOException: org.apache.hudi.org.apache.avro.AvroRuntimeException: Duplicate field _hoodie_commit_time in record flink_schema: _hoodie_commit_time type:UNION pos:13 and _hoodie_commit_time type:UNION pos:0. at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:296) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250) at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:381) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:82) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:69) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:69) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:39) at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131) at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:74) at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: java.io.IOException: org.apache.hudi.org.apache.avro.AvroRuntimeException: Duplicate field _hoodie_commit_time in record flink_schema: _hoodie_commit_time type:UNION pos:13 and _hoodie_commit_time type:UNION pos:0. at org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.initNextRecordReader(TezGroupedSplitsInputFormat.java:199) at org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.next(TezGroupedSplitsInputFormat.java:145) at org.apache.tez.mapreduce.lib.MRReaderMapred.next(MRReaderMapred.java:116) at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68) at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:419) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:267) ... 16 more Caused by: java.io.IOException: org.apache.hudi.org.apache.avro.AvroRuntimeException: Duplicate field _hoodie_commit_time in record flink_schema: _hoodie_commit_time type:UNION pos:13 and _hoodie_commit_time type:UNION pos:0. at org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderCreationException(HiveIOExceptionHandlerChain.java:97) at org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(HiveIOExceptionHandlerUtil.java:57) at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:420) at org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.initNextRecordReader(TezGroupedSplitsInputFormat.java:196) ... 21 more Caused by: org.apache.hudi.org.apache.avro.AvroRuntimeException: Duplicate field _hoodie_commit_time in record flink_schema: _hoodie_commit_time type:UNION pos:13 and _hoodie_commit_time type:UNION pos:0. at org.apache.hudi.org.apache.avro.Schema$RecordSchema.setFields(Schema.java:651) at org.apache.hudi.avro.HoodieAvroUtils.generateProjectionSchema(HoodieAvroUtils.java:534) at org.apache.hudi.hadoop.avro.HoodieAvroParquetReader.(HoodieAvroParquetReader.java:63) at org.apache.hudi.hadoop.avro.HoodieTimestampAwareParquetInputFormat.createRecordReader(HoodieTimestampAwareParquetInputFormat.java:42) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:94) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:60) at org.apache.hudi.hadoop.HoodieParquetInputFormat.getRecordReaderInternal(HoodieParquetInputFormat.java:129) at org.apache.hudi.hadoop.HoodieParquetInputFormat.getRecordReader(HoodieParquetInputFormat.java:121) at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:417) ... 22 more ]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:93, Vertex vertex_1706375932152_272353_2_00 [Map 1] killed/failed due to:OWN_TASK_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:0 (state=08S01,code=2) 1: jdbc:hive2://crpprd10hd01:21181,crpprd6hd0> Duplicate field _hoodie_commit_time in record flink_schema

To Reproduce

Steps to reproduce the behavior:

1. package com.hand.sink;

import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import lombok.extern.slf4j.Slf4j; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.util.HoodiePipeline;

import java.util.ArrayList; import java.util.HashMap; import java.util.Locale; import java.util.Map;

/**

2.querying the external table in Hive

Expected behavior

Normal query synced to external table in hive

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

danny0405 commented 6 months ago

What catalog did you use then, the Hudi catalog in hms mode is expected to be used here instead of the Flink Hive catalog.

Toroidals commented 6 months ago

What catalog did you use then, the Hudi catalog in hms mode is expected to be used here instead of the Flink Hive catalog. The default catalog used for writing to Hudi in Flink, with database: hudi, TABLE_NAME: hudi_table_01, is synchronized to the Hive database: ods, table name: ods_hive_table_01. When querying the "ro" table ods.ods_hive_table_01 in Hive, this error occurs.

HoodiePipeline.Builder builder = HoodiePipeline.builder("hudi_table_01"); builder.pk(hudiPrimaryKeys);

Map<String, String> options = new HashMap<>(); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());

options.put(FlinkOptions.DATABASE_NAME.key(), "hudi"); options.put(FlinkOptions.TABLE_NAME.key(), "hudi_table_01");

options.put(FlinkOptions.PRE_COMBINE.key(), "true"); options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts_ms");

options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms"); options.put(FlinkOptions.HIVE_SYNC_DB.key(), "ods"); options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), "ods_hive_table_01");

... builder.options(options); return builder;

What catalog did you use then, the Hudi catalog in hms mode is expected to be used here instead of the Flink Hive catalog.

Toroidals commented 6 months ago

What catalog did you use then, the Hudi catalog in hms mode is expected to be used here instead of the Flink Hive catalog.

The Flink is writing to "hudi.hudi_test_cdc_01" and the Hive is querying the table "ods.hive_test_cdc_01", The complete code is as follows: package com.hand.sink;

import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import lombok.extern.slf4j.Slf4j; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.util.HoodiePipeline;

import java.util.ArrayList; import java.util.HashMap; import java.util.Locale; import java.util.Map;

/**

danny0405 commented 6 months ago

Can you show the CREATE TABLE info in Hive CLI, let's see what the table schema looks like.

Toroidals commented 6 months ago

Can you show the CREATE TABLE info in Hive CLI, let's see what the table schema looks like.

+----------------------------------------------------+ | createtab_stmt | +----------------------------------------------------+ | CREATE EXTERNAL TABLE ods_rbs.ods_rbs_rbscmfprd_cmf_fin_ar_trx_types_cdc( | | _hoodie_commit_time string COMMENT '', | | _hoodie_commit_seqno string COMMENT '', | | _hoodie_record_key string COMMENT '', | | _hoodie_partition_path string COMMENT '', | | _hoodie_file_name string COMMENT '', | | cust_trx_type_id string COMMENT '', | | trx_name string COMMENT '', | | description string COMMENT '', | | default_status string COMMENT '', | | type string COMMENT '', | | org_id string COMMENT '', | | creation_sign string COMMENT '', | | gl_id_rec string COMMENT '', | | gl_code_rec string COMMENT '', | | gl_id_rev string COMMENT '', | | gl_code_rev string COMMENT '', | | start_date timestamp COMMENT '', | | end_date timestamp COMMENT '', | | post_to_gl string COMMENT '', | | accounting_affect_flag string COMMENT '', | | allow_freight_flag string COMMENT '', | | allow_overapplication_flag string COMMENT '', | | tax_calculation_flag string COMMENT '', | | source_system_code string COMMENT '', | | object_version_number string COMMENT '', | | creation_date timestamp COMMENT '', | | created_by string COMMENT '', | | last_updated_by string COMMENT '', | | last_update_date timestamp COMMENT '', | | last_update_login string COMMENT '', | | program_application_id string COMMENT '', | | program_id string COMMENT '', | | program_update_date timestamp COMMENT '', | | request_id string COMMENT '', | | attribute_category string COMMENT '', | | attribute1 string COMMENT '', | | attribute2 string COMMENT '', | | attribute3 string COMMENT '', | | attribute4 string COMMENT '', | | attribute5 string COMMENT '', | | attribute6 string COMMENT '', | | attribute7 string COMMENT '', | | attribute8 string COMMENT '', | | attribute9 string COMMENT '', | | attribute10 string COMMENT '', | | attribute11 string COMMENT '', | | attribute12 string COMMENT '', | | attribute13 string COMMENT '', | | attribute14 string COMMENT '', | | attribute15 string COMMENT '', | | source_id string COMMENT '', | | _flink_cdc_connector string COMMENT '', | | _flink_cdc_db string COMMENT '', | | _flink_cdc_table string COMMENT '', | | _flink_cdc_op string COMMENT '', | | _flink_cdc_ts_ms timestamp COMMENT '') | | ROW FORMAT SERDE | | 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' | | WITH SERDEPROPERTIES ( | | 'hoodie.query.as.ro.table'='true', | | 'path'='hdfs:///apps/hive/warehouse/hudi.db/hudi_rbs_rbscmfprd_cmf_fin_ar_trx_types_cdc') | | STORED AS INPUTFORMAT | | 'org.apache.hudi.hadoop.HoodieParquetInputFormat' | | OUTPUTFORMAT | | 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' | | LOCATION | | 'hdfs://cmccprdhadoop/apps/hive/warehouse/hudi.db/hudi_rbs_rbscmfprd_cmf_fin_ar_trx_types_cdc' | | TBLPROPERTIES ( | | 'last_commit_completion_time_sync'='20240301154558898', | | 'last_commit_time_sync'='20240301104559129', | | 'spark.sql.sources.provider'='hudi', | | 'spark.sql.sources.schema.numParts'='2', | | 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"cust_trx_type_id","type":"string","nullable":false,"metadata":{}},{"name":"trx_name","type":"string","nullable":true,"metadata":{}},{"name":"description","type":"string","nullable":true,"metadata":{}},{"name":"default_status","type":"string","nullable":true,"metadata":{}},{"name":"type","type":"string","nullable":true,"metadata":{}},{"name":"org_id","type":"string","nullable":true,"metadata":{}},{"name":"creation_sign","type":"string","nullable":true,"metadata":{}},{"name":"gl_id_rec","type":"string","nullable":true,"metadata":{}},{"name":"gl_code_rec","type":"string","nullable":true,"metadata":{}},{"name":"gl_id_rev","type":"string","nullable":true,"metadata":{}},{"name":"gl_code_rev","type":"string","nullable":true,"metadata":{}},{"name":"start_date","type":"timestamp","nullable":true,"metadata":{}},{"name":"end_date","type":"timestamp","nullable":true,"metadata":{}},{"name":"post_to_gl","type":"string","nullable":true,"metadata":{}},{"name":"accounting_affect_flag","type":"string","nullable":true,"metadata":{}},{"name":"allow_freight_flag","type":"string","nullable":true,"metadata":{}},{"name":"allow_overapplication_flag","type":"string","nullable":true,"metadata":{}},{"name":"tax_calculation_flag","type":"string","nullable":true,"metadata":{}},{"name":"source_system_code","type":"string","nullable":true,"metadata":{}},{"name":"object_version_number","type":"string","nullable":true,"metadata":{}},{"name":"creation_date","type":"timestamp","nullable":true,"metadata":{}},{"name":"created_by","type":"string","nullable":true,"metadata":{}},{"name":"last_updated_by","type":"string","nullable":true,"metadata":{}},{"name":"last_update_date","type":"timestamp","nullable":true,"metadata":{}},{"name":"last_update_login","type":"string","nullable":true,"metadata":{}},{"name":"program_application_id","type":"string","nullable":true,"metadata":{}},{"name":"program_id","type":"string","nullable":true,"metadata":{}},{"name":"program_update_date","type":"timestamp","nullable":true,"metadata":{}},{"name":"request_id","type":"string","nullable":true,"metadata":{}},{"name":"attribute_category","type":"string","nullable":true,"metadata":{}},{"name":"attribute1","type":"string","nullable":true,"metadata":{}},{"name":"attribute2","type":"string","nullable":true,"metadata":{}},{"name":"attribute3","type":"string","nullable":true,"metadata":{}},{"name":"attribute4","type":"string","nullable":true,"metadata":{}},{"name":"attribute5","type":"string","nullable":true,"metadata":{}},{"name":"attribute6","type":"string","nullable":true,"metadata":{}},{"name":"attribute7","type":"string","nullable":true,"metadata":{}},{"name":"attribute8","type":"string","nullable":true,"metadata":{}},{"name":"attribute9","type":"string","nullable":true,"metadata":{}},{"name":"attribute10","type":"string","nullable":true,"metadata":{}},{"name":"attribute11","type":"string","nullable":true,"metadata":{}},{"name":"attribute12","type":"string","nullable":true,"metadata":{}},{"name":"attribute13","type":"string","nullable":true,"metadata":{}},{"name":"attribute14","type":"string","nullable":true,"metadata":{}},{"name":"attribute15","type":"string","nullable":true,"metadata":{}},{"name":"source_id","type":"string","nullable":true,"metadata":{}},{"name":"_flink_cdc_connector","type":"string","nullable":true,"metadata":{}},{"name":"_flink_cdc_db","type":"string","nullable":true,"metadata":{}},{"name":"_flink_cdc_table","type":"string","nullable":true,"metadata":{}},{"name":"_flink_cdc_op","type":"string","nullable":true,"metadata":{}},{"name":"_flink_cdc_ts_ms","t', | | 'spark.sql.sources.schema.part.1'='ype":"timestamp","nullable":true,"metadata":{}}]}', | | 'transient_lastDdlTime'='1707238578') | +----------------------------------------------------+

danny0405 commented 6 months ago

It's as expected, I'm confused why it reports a Flink scheam error if you query the table using Hive?

qw2qw2 commented 1 month ago

I encountered the exact same problem. So, how to solve it? my version: Hudi version : 0.14.0 Hive version :3.1.3 Hadoop version :3.3.6

danny0405 commented 1 month ago

Did you create the Flink table by using the hudi hive catalog?

qw2qw2 commented 1 month ago

Did you create the Flink table by using the hudi hive catalog?

Just flink sql Auto-Sync Hive table,I did not use catalog:

CREATE TABLE table01( ... ) PARTITIONED BY (dt,hr) WITH( 'connector' = 'hudi', 'path'='hdfs:///user/xxx/hudi/warehouse/table01', 'table.type' = 'COPY_ON_WRITE', 'write.operation' = 'upsert', 'hoodie.datasource.write.recordkey.field' = 'record_key', 'write.precombine.field' = 'record_time', 'write.tasks' = '24', 'hoodie.datasource.write.hive_style_partitioning' = 'true', 'hive_sync.enable' = 'true', 'hive_sync.mode' = 'hms', 'hive_sync.conf.dir'='hdfs:///user/hive/conf', 'hive_sync.db' = 'dbname', 'hive_sync.table' = 'table01', 'hive_sync.partition_fields' = 'dt,hr', 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hive_sync.support_timestamp' = 'true' )

danny0405 commented 1 month ago

The tez is not supported, did you try to disable it?

qw2qw2 commented 1 month ago

The tez is not supported, did you try to disable it?

I changed to mr engine, but another err: Caused by: java.lang.NoSuchFieldError: NULL_VALUE at org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:246) at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:231) at org.apache.hudi.hadoop.avro.HoodieAvroParquetReader.(HoodieAvroParquetReader.java:58) at org.apache.hudi.hadoop.avro.HoodieTimestampAwareParquetInputFormat.createRecordReader(HoodieTimestampAwareParquetInputFormat.java:42) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:94) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:60) at org.apache.hudi.hadoop.HoodieParquetInputFormat.getRecordReaderInternal(HoodieParquetInputFormat.java:129) at org.apache.hudi.hadoop.HoodieParquetInputFormat.getRecordReader(HoodieParquetInputFormat.java:121) at org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.(CombineHiveRecordReader.java:99)

Do you mean that Tez queries must use HoodieHiveCatalog?

danny0405 commented 1 month ago

No, Tez needs to be disabled in Hive engine, the mor error stacktrace indicates there is jar conflicts for parquet-avro.

qw2qw2 commented 1 month ago

No, Tez needs to be disabled in Hive engine, the mor error stacktrace indicates there is jar conflicts for parquet-avro.

Thank you for your response. I encountered an issue with parquet.avro when I tried switching to the mr engine using set hive.execution.engine=mr; (The error message was written above.) What do you mean by disable Tez in the Hive engine? After all, Hudi tables are still a minority in Hive, so uninstalling Tez isn't an option.

danny0405 commented 1 month ago

@xicm Maybe you can take a look at this issue.

xicm commented 1 month ago

Can you share the content of your parquet file? spark.parquet("file_name").show() or other tools.

xicm commented 1 month ago

https://github.com/apache/hudi/blob/47bdc2709566f726fa503919c87004ec26f14817/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java#L518-L536

This duplicate field is either from the parquet file or from hive conf. We may need a deduplicate here.

qw2qw2 commented 1 month ago

Can you share the content of your parquet file? spark.parquet("file_name").show() or other tools.

hive show table is bellow: CREATE EXTERNAL TABLE table1(
_hoodie_commit_time string COMMENT '',
_hoodie_commit_seqno string COMMENT '',
_hoodie_record_key string COMMENT '',
_hoodie_partition_path string COMMENT '',
_hoodie_file_name string COMMENT '',
id int COMMENT '',
merchant_id int COMMENT '',
merchant_name string COMMENT '',
product_id int COMMENT '',
product_name string COMMENT '',
trans_id string COMMENT '',
ps_trans_id string COMMENT '',
sw_order_id string COMMENT '',
chl_order_id string COMMENT '',
account_id int COMMENT '',
user_name string COMMENT '',
ip string COMMENT '',
paygate int COMMENT '',
paygate_name string COMMENT '',
ip_location string COMMENT '',
trans_status int COMMENT '',
amt string COMMENT '',
poundage string COMMENT '',
trans_type int COMMENT '',
wallet_error_code string COMMENT '',
wallet_error_desc string COMMENT '',
spgw_error_code string COMMENT '',
spgw_error_desc string COMMENT '',
chl_error_code string COMMENT '',
chl_error_desc string COMMENT '',
chl_error_type string COMMENT '',
trans_steps int COMMENT '',
trans_created_time timestamp COMMENT '',
trans_finish_time timestamp COMMENT '',
wallet_req_time timestamp COMMENT '',
spgw_req_time timestamp COMMENT '',
chl_res_time timestamp COMMENT '',
spgw_res_time timestamp COMMENT '',
chl_callback_time timestamp COMMENT '',
spgw_callback_time timestamp COMMENT '',
trans_cost_time int COMMENT '',
trans_avg_cost_time int COMMENT '',
error_code string COMMENT '',
time_dimension_id int COMMENT '',
account_amt string COMMENT '',
account_withdraw_amt string COMMENT '',
onway_amt string COMMENT '',
total_subsidy_amt string COMMENT '',
real_name string COMMENT '',
credentials_number string COMMENT '',
bind_mobile string COMMENT '',
bind_email string COMMENT '',
account_status int COMMENT '',
last_ip string COMMENT '',
last_login_time timestamp COMMENT '',
account_created_on timestamp COMMENT '',
account_updated_on timestamp COMMENT '',
last_trans_time timestamp COMMENT '',
freeze_time timestamp COMMENT '',
verity_level int COMMENT '',
product_acc_opened int COMMENT '',
pay_channel string COMMENT '',
channel_name string COMMENT '',
terminal_type int COMMENT '',
terminal_name string COMMENT '',
main_key string COMMENT '',
main_key_name string COMMENT '',
acnt_name string COMMENT '',
flow_type int COMMENT '',
flow_type_name string COMMENT '',
service string COMMENT '',
date_value string COMMENT '',
year string COMMENT '',
month string COMMENT '',
day string COMMENT '',
hour int COMMENT '',
quarter int COMMENT '',
month_with_in_quarter int COMMENT '',
season string COMMENT '',
week int COMMENT '',
day_of_week string COMMENT '',
workday string COMMENT '',
holiday_id int COMMENT '',
holiday_name string COMMENT '',
am_pm string COMMENT '')
PARTITIONED BY (
trans_year string COMMENT '',
trans_month string COMMENT '')
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
'hoodie.query.as.ro.table'='false',
'path'='hdfs://router/user/xxx/table1')
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION
'hdfs://router/user/xxx/table1' TBLPROPERTIES (
'last_commit_completion_time_sync'='20240813132534749',
'last_commit_time_sync'='20240813131932747',
'spark.sql.sources.provider'='hudi',
'spark.sql.sources.schema.numPartCols'='2',
'spark.sql.sources.schema.numParts'='2',
'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"ID","type":"integer","nullable":true,"metadata":{}},{"name":"MERCHANT_ID","type":"integer","nullable":true,"metadata":{}},{"name":"MERCHANT_NAME","type":"string","nullable":true,"metadata":{}},{"name":"PRODUCT_ID","type":"integer","nullable":true,"metadata":{}},{"name":"PRODUCT_NAME","type":"string","nullable":true,"metadata":{}},{"name":"TRANS_ID","type":"string","nullable":true,"metadata":{}},{"name":"PS_TRANS_ID","type":"string","nullable":true,"metadata":{}},{"name":"SW_ORDER_ID","type":"string","nullable":true,"metadata":{}},{"name":"CHL_ORDER_ID","type":"string","nullable":true,"metadata":{}},{"name":"ACCOUNT_ID","type":"integer","nullable":true,"metadata":{}},{"name":"USER_NAME","type":"string","nullable":true,"metadata":{}},{"name":"IP","type":"string","nullable":true,"metadata":{}},{"name":"PAYGATE","type":"integer","nullable":true,"metadata":{}},{"name":"PAYGATE_NAME","type":"string","nullable":true,"metadata":{}},{"name":"IP_LOCATION","type":"string","nullable":true,"metadata":{}},{"name":"TRANS_STATUS","type":"integer","nullable":true,"metadata":{}},{"name":"AMT","type":"string","nullable":true,"metadata":{}},{"name":"POUNDAGE","type":"string","nullable":true,"metadata":{}},{"name":"TRANS_TYPE","type":"integer","nullable":true,"metadata":{}},{"name":"WALLET_ERROR_CODE","type":"string","nullable":true,"metadata":{}},{"name":"WALLET_ERROR_DESC","type":"string","nullable":true,"metadata":{}},{"name":"SPGW_ERROR_CODE","type":"string","nullable":true,"metadata":{}},{"name":"SPGW_ERROR_DESC","type":"string","nullable":true,"metadata":{}},{"name":"CHL_ERROR_CODE","type":"string","nullable":true,"metadata":{}},{"name":"CHL_ERROR_DESC","type":"string","nullable":true,"metadata":{}},{"name":"CHL_ERROR_TYPE","type":"string","nullable":true,"metadata":{}},{"name":"TRANS_STEPS","type":"integer","nullable":true,"metadata":{}},{"name":"TRANS_CREATED_TIME","type":"timestamp","nullable":true,"metadata":{}},{"name":"TRANS_FINISH_TIME","type":"timestamp","nullable":true,"metadata":{}},{"name":"WALLET_REQ_TIME","type":"timestamp","nullable":true,"metadata":{}},{"name":"SPGW_REQ_TIME","type":"timestamp","nullable":true,"metadata":{}},{"name":"CHL_RES_TIME","type":"timestamp","nullable":true,"metadata":{}},{"name":"SPGW_RES_TIME","type":"timestamp","nullable":true,"metadata":{}},{"name":"CHL_CALLBACK_TIME","type":"timestamp","nullable":true,"metadata":{}},{"name":"SPGW_CALLBACK_TIME","type":"timestamp","nullable":true,"metadata":{}},{"name":"TRANS_COST_TIME","type":"integer","nullable":true,"metadata":{}},{"name":"TRANS_AVG_COST_TIME","type":"integer","nullable":true,"metadata":{}},{"name":"ERROR_CODE","type":"string","nullable":true,"metadata":{}},{"name":"TIME_DIMENSION_ID","type":"integer","nullable":true,"metadata":{}},{"name":"ACCOUNT_AMT","type":"string","nullable":true,"metadata":{}},{"name":"ACCOUNT_WITHDRAW_AMT","type":"string","nullable":true,"metadata":{}},{"name":"ONWAY_AMT","type":"string","nullable":true,"metadata":{}},{"name":"TOTAL_SUBSIDY_AMT","type":"string","nullable":true,"metadata":{}},{"name":"REAL_NAME","type":"string","nullable":true,"metadata":{}},{"name":"CREDENTIALS_NUMBER","type":"string","nullable":true,"metadata":{}},{"name":"BIND_MOBILE","type":"string","nullable":true,"metadata":{}},{"name":"BIND_EMAIL","type":"string","nullable":true,"metadata":{}},{"name":"ACCOUNT_STATUS","type":"integer","nullable":true,"metadata":{}},{"name":"LAST_IP","type":"string","nullable":true,"metadata":{}},{"name":"LAST_LOGIN_TIME","type":"timestamp","nullable":true,"metadata":{}},{"name":"ACCOUNT_CREATED_ON","ty',
'spark.sql.sources.schema.part.1'='pe":"timestamp","nullable":true,"metadata":{}},{"name":"ACCOUNT_UPDATED_ON","type":"timestamp","nullable":true,"metadata":{}},{"name":"LAST_TRANS_TIME","type":"timestamp","nullable":true,"metadata":{}},{"name":"FREEZE_TIME","type":"timestamp","nullable":true,"metadata":{}},{"name":"VERITY_LEVEL","type":"integer","nullable":true,"metadata":{}},{"name":"PRODUCT_ACC_OPENED","type":"integer","nullable":true,"metadata":{}},{"name":"PAY_CHANNEL","type":"string","nullable":true,"metadata":{}},{"name":"CHANNEL_NAME","type":"string","nullable":true,"metadata":{}},{"name":"TERMINAL_TYPE","type":"integer","nullable":true,"metadata":{}},{"name":"TERMINAL_NAME","type":"string","nullable":true,"metadata":{}},{"name":"MAIN_KEY","type":"string","nullable":true,"metadata":{}},{"name":"MAIN_KEY_NAME","type":"string","nullable":true,"metadata":{}},{"name":"ACNT_NAME","type":"string","nullable":true,"metadata":{}},{"name":"FLOW_TYPE","type":"integer","nullable":true,"metadata":{}},{"name":"FLOW_TYPE_NAME","type":"string","nullable":true,"metadata":{}},{"name":"SERVICE","type":"string","nullable":true,"metadata":{}},{"name":"DATE_VALUE","type":"string","nullable":true,"metadata":{}},{"name":"YEAR","type":"string","nullable":true,"metadata":{}},{"name":"MONTH","type":"string","nullable":true,"metadata":{}},{"name":"DAY","type":"string","nullable":true,"metadata":{}},{"name":"HOUR","type":"integer","nullable":true,"metadata":{}},{"name":"QUARTER","type":"integer","nullable":true,"metadata":{}},{"name":"MONTH_WITH_IN_QUARTER","type":"integer","nullable":true,"metadata":{}},{"name":"SEASON","type":"string","nullable":true,"metadata":{}},{"name":"WEEK","type":"integer","nullable":true,"metadata":{}},{"name":"DAY_OF_WEEK","type":"string","nullable":true,"metadata":{}},{"name":"WORKDAY","type":"string","nullable":true,"metadata":{}},{"name":"HOLIDAY_ID","type":"integer","nullable":true,"metadata":{}},{"name":"HOLIDAY_NAME","type":"string","nullable":true,"metadata":{}},{"name":"AM_PM","type":"string","nullable":true,"metadata":{}},{"name":"TRANS_YEAR","type":"string","nullable":true,"metadata":{}},{"name":"TRANS_MONTH","type":"string","nullable":true,"metadata":{}}]}',
'spark.sql.sources.schema.partCol.0'='TRANS_YEAR',
'spark.sql.sources.schema.partCol.1'='TRANS_MONTH',
'transient_lastDdlTime'='1723458706')

and spark-shell is bellow scala> val in = spark.read.parquet("/user/xxx/table1/TRANS_YEAR=2019/TRANS_MONTH=7") scala> in.printSchema root |-- _hoodie_commit_time: string (nullable = true) |-- _hoodie_commit_seqno: string (nullable = true) |-- _hoodie_record_key: string (nullable = true) |-- _hoodie_partition_path: string (nullable = true) |-- _hoodie_file_name: string (nullable = true) |-- ID: integer (nullable = true) |-- MERCHANT_ID: integer (nullable = true) |-- MERCHANT_NAME: string (nullable = true) |-- PRODUCT_ID: integer (nullable = true) |-- PRODUCT_NAME: string (nullable = true) |-- TRANS_ID: string (nullable = true) |-- PS_TRANS_ID: string (nullable = true) |-- SW_ORDER_ID: string (nullable = true) |-- CHL_ORDER_ID: string (nullable = true) |-- ACCOUNT_ID: integer (nullable = true) |-- USER_NAME: string (nullable = true) |-- IP: string (nullable = true) |-- PAYGATE: integer (nullable = true) |-- PAYGATE_NAME: string (nullable = true) |-- IP_LOCATION: string (nullable = true) |-- TRANS_STATUS: integer (nullable = true) |-- AMT: string (nullable = true) |-- POUNDAGE: string (nullable = true) |-- TRANS_TYPE: integer (nullable = true) |-- WALLET_ERROR_CODE: string (nullable = true) |-- WALLET_ERROR_DESC: string (nullable = true) |-- SPGW_ERROR_CODE: string (nullable = true) |-- SPGW_ERROR_DESC: string (nullable = true) |-- CHL_ERROR_CODE: string (nullable = true) |-- CHL_ERROR_DESC: string (nullable = true) |-- CHL_ERROR_TYPE: string (nullable = true) |-- TRANS_STEPS: integer (nullable = true) |-- TRANS_CREATED_TIME: timestamp (nullable = true) |-- TRANS_FINISH_TIME: timestamp (nullable = true) |-- WALLET_REQ_TIME: timestamp (nullable = true) |-- SPGW_REQ_TIME: timestamp (nullable = true) |-- CHL_RES_TIME: timestamp (nullable = true) |-- SPGW_RES_TIME: timestamp (nullable = true) |-- CHL_CALLBACK_TIME: timestamp (nullable = true) |-- SPGW_CALLBACK_TIME: timestamp (nullable = true) |-- TRANS_COST_TIME: integer (nullable = true) |-- TRANS_AVG_COST_TIME: integer (nullable = true) |-- ERROR_CODE: string (nullable = true) |-- TIME_DIMENSION_ID: integer (nullable = true) |-- ACCOUNT_AMT: string (nullable = true) |-- ACCOUNT_WITHDRAW_AMT: string (nullable = true) |-- ONWAY_AMT: string (nullable = true) |-- TOTAL_SUBSIDY_AMT: string (nullable = true) |-- REAL_NAME: string (nullable = true) |-- CREDENTIALS_NUMBER: string (nullable = true) |-- BIND_MOBILE: string (nullable = true) |-- BIND_EMAIL: string (nullable = true) |-- ACCOUNT_STATUS: integer (nullable = true) |-- LAST_IP: string (nullable = true) |-- LAST_LOGIN_TIME: timestamp (nullable = true) |-- ACCOUNT_CREATED_ON: timestamp (nullable = true) |-- ACCOUNT_UPDATED_ON: timestamp (nullable = true) |-- LAST_TRANS_TIME: timestamp (nullable = true) |-- FREEZE_TIME: timestamp (nullable = true) |-- VERITY_LEVEL: integer (nullable = true) |-- PRODUCT_ACC_OPENED: integer (nullable = true) |-- PAY_CHANNEL: string (nullable = true) |-- CHANNEL_NAME: string (nullable = true) |-- TERMINAL_TYPE: integer (nullable = true) |-- TERMINAL_NAME: string (nullable = true) |-- MAIN_KEY: string (nullable = true) |-- MAIN_KEY_NAME: string (nullable = true) |-- ACNT_NAME: string (nullable = true) |-- FLOW_TYPE: integer (nullable = true) |-- FLOW_TYPE_NAME: string (nullable = true) |-- SERVICE: string (nullable = true) |-- DATE_VALUE: string (nullable = true) |-- YEAR: string (nullable = true) |-- MONTH: string (nullable = true) |-- DAY: string (nullable = true) |-- HOUR: integer (nullable = true) |-- QUARTER: integer (nullable = true) |-- MONTH_WITH_IN_QUARTER: integer (nullable = true) |-- SEASON: string (nullable = true) |-- WEEK: integer (nullable = true) |-- DAY_OF_WEEK: string (nullable = true) |-- WORKDAY: string (nullable = true) |-- HOLIDAY_ID: integer (nullable = true) |-- HOLIDAY_NAME: string (nullable = true) |-- AM_PM: string (nullable = true) |-- TRANS_YEAR: string (nullable = true) |-- TRANS_MONTH: string (nullable = true)

scala> in.show |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| ID|MERCHANT_ID|MERCHANT_NAME|PRODUCT_ID|PRODUCT_NAME| TRANS_ID| PS_TRANS_ID| SW_ORDER_ID| CHL_ORDER_ID|ACCOUNT_ID| USER_NAME| IP|PAYGATE| PAYGATE_NAME|IP_LOCATION|TRANS_STATUS| AMT|POUNDAGE|TRANS_TYPE|WALLET_ERROR_CODE| WALLET_ERROR_DESC|SPGW_ERROR_CODE| SPGW_ERROR_DESC|CHL_ERROR_CODE|CHL_ERROR_DESC|CHL_ERROR_TYPE|TRANS_STEPS| TRANS_CREATED_TIME| TRANS_FINISH_TIME| WALLET_REQ_TIME| SPGW_REQ_TIME| CHL_RES_TIME| SPGW_RES_TIME|CHL_CALLBACK_TIME|SPGW_CALLBACK_TIME|TRANS_COST_TIME|TRANS_AVG_COST_TIME|ERROR_CODE|TIME_DIMENSION_ID|ACCOUNT_AMT|ACCOUNT_WITHDRAW_AMT|ONWAY_AMT|TOTAL_SUBSIDY_AMT|REAL_NAME|CREDENTIALS_NUMBER|BIND_MOBILE|BIND_EMAIL|ACCOUNT_STATUS|LAST_IP|LAST_LOGIN_TIME| ACCOUNT_CREATED_ON| ACCOUNT_UPDATED_ON| LAST_TRANS_TIME|FREEZE_TIME|VERITY_LEVEL|PRODUCT_ACC_OPENED|PAY_CHANNEL|CHANNEL_NAME|TERMINAL_TYPE|TERMINAL_NAME|MAIN_KEY|MAIN_KEY_NAME| ACNT_NAME|FLOW_TYPE|FLOW_TYPE_NAME| SERVICE|DATE_VALUE|YEAR|MONTH|DAY|HOUR|QUARTER|MONTH_WITH_IN_QUARTER|SEASON|WEEK|DAY_OF_WEEK|WORKDAY|HOLIDAY_ID|HOLIDAY_NAME|AM_PM|TRANS_YEAR|TRANS_MONTH|

| 20240813094726141|20240813094726141...| ID:1725539201| TRANS_YEAR=2019/T...|ba303f39-73f5-492...

xicm commented 1 month ago

Could you give a full reproduce code? include hive qeury sql. I'm not able to reproduce.

qw2qw2 commented 1 month ago

Could you give a full reproduce code? include hive qeury sql. I'm not able to reproduce.

Bellow is my code: package com.flink.dw.dws; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class HudiTableFlink { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000*10); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); env.getCheckpointConfig().setCheckpointTimeout(6000000); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().enableUnalignedCheckpoints(); env.setStateBackend(new RocksDBStateBackend("hdfs:///user/xxx/checkpoint/table1",true)); EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); env.setParallelism(16);

    tableEnv.executeSql(
            "CREATE TABLE TRANS_INFO ("+
                    "ID INT," +
                    "MERCHANT_ID INT," +
                    "MERCHANT_NAME STRING," +
                    "PRODUCT_ID INT," +
                    "PRODUCT_NAME STRING," +
                    "TRANS_ID STRING," +
                    "PS_TRANS_ID STRING," +
                    "TRANS_CREATED_TIME TIMESTAMP(3)," +
                   ...
                    "`YEAR` STRING," +
                    "`MONTH` STRING," +
                    "`DAY` STRING," +
                    "`HOUR` INT," +
                    ") WITH (" +
                    " 'connector' = 'kafka'," +
                    " 'topic' = '"+ConfigUtil.getConfig().get("kafka.topic.prefix")+"PROD_RDC'," +
                    " 'properties.bootstrap.servers' = '"+ ConfigUtil.getConfig().get("kafka.bootstrap-servers")+"',"  +
                    " 'properties.group.id' = '2024081400'," +
                    " 'scan.startup.mode' = 'earliest-offset'," +
                    " 'format' = 'json'" +
                    ")"
    );
    tableEnv.executeSql(
            "CREATE TABLE table1(" +
                    "ID INT," +
                    "MERCHANT_ID INT," +
                    "MERCHANT_NAME STRING," +
                    "PRODUCT_ID INT," +
                    "PRODUCT_NAME STRING," +
                    "TRANS_ID STRING," +
                    "PS_TRANS_ID STRING," +
                    "TRANS_CREATED_TIME TIMESTAMP(3)," +

                    ...
                    "`YEAR` STRING," +
                    "`MONTH` STRING," +
                    "`DAY` STRING," +
                    "`HOUR` INT," +
                    "TRANS_YEAR STRING,"+
                    "TRANS_MONTH STRING "+
                    //"TRANS_DAY STRING"+
                    ")" +
                    "PARTITIONED BY (TRANS_YEAR ,TRANS_MONTH)" +
                    "WITH(" +
                    "    'connector' = 'hudi'," +
                    "    'path'='hdfs:///user/xxx/table1'," +
                    "    'table.type' = 'COPY_ON_WRITE'," +
                    "    'write.operation' = 'upsert'," +
                    "    'hoodie.datasource.write.hive_style_partitioning' = 'true',"+
                    "    'hoodie.datasource.write.recordkey.field' = 'ID'," +
                    "    'write.precombine.field' = 'TRANS_CREATED_TIME'," +
                    "    'hoodie.datasource.write.hive_style_partitioning' = 'true',"+
                    "    'hive_sync.enable' = 'true'," +
                    "    'hive_sync.mode' = 'hms'," +
                    "    'hive_sync.conf.dir'='hdfs:///user/hive/public/conf'," +
                    "    'hive_sync.db' = 'dbname'," +
                    "    'hive_sync.table' = 'table1'," +
                    "    'hive_sync.partition_fields' = 'TRANS_YEAR ,TRANS_MONTH'," +
                    "    'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'"+
                    ")"
    );
    tableEnv.executeSql("insert into table1  ( " +
            "ID ," +
            "MERCHANT_ID ," +
            "MERCHANT_NAME ," +
            "PRODUCT_ID ," +
            "PRODUCT_NAME ," +
            "TRANS_ID ," +
            "TRANS_CREATED_TIME ," +
            ...
            "`YEAR` ," +
            "`MONTH` ," +
            "`DAY` ," +
            "`HOUR` ," +
            "TRANS_YEAR," +
            "TRANS_MONTH " +
            ")" +

            "SELECT " +
            "ID ," +
            "MERCHANT_ID ," +
            "MERCHANT_NAME ," +
            "PRODUCT_ID ," +
            "PRODUCT_NAME ," +
            "TRANS_ID ," +
            "TRANS_CREATED_TIME ," +
            ...
            "`YEAR` ," +
            "`MONTH` ," +
            "`DAY` ," +
            "`HOUR` ," +
            "`YEAR` as TRANS_YEAR," +
            "`MONTH` as TRANS_MONTH " +
            "FROM TRANS_INFO");
}

}

and my hive sql: select * from table1 where trans_year='2024' and trans_month='7' and id=1698350 limit 10;

xicm commented 4 weeks ago

@qw2qw2 Is the exception you encountered java.lang.NoSuchFieldError: NULL_VALUE or Duplicate field _hoodie_commit_time. I didn't reproduce either of them.

As Danny said NoSuchFieldError is a dependence confilict.

qw2qw2 commented 4 weeks ago

@qw2qw2 Is the exception you encountered java.lang.NoSuchFieldError: NULL_VALUE or Duplicate field _hoodie_commit_time. I didn't reproduce either of them.

As Danny said NoSuchFieldError is a dependence confilict.

Thank you for your response. I encountered "Duplicate field _hoodie_commit_time" in hive tez engine, and "java.lang.NoSuchFieldError: NULL_VALUE" when I tried switching to the mr engine

xicm commented 4 weeks ago

https://github.com/apache/hudi/blob/47bdc2709566f726fa503919c87004ec26f14817/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java#L518-L536

Could you add some log and package hudi-hadoop-mr to see if there are duplicate fields in fieldNames ? The fieldNames comes from hive engine. If there are duplicates we can try to deduplicate the fields.