apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.48k stars 2.44k forks source link

Unable to merge cdc records to Hudi snapshot. #11785

Open rcc1101 opened 3 months ago

rcc1101 commented 3 months ago

I am unable to add cdc records to a snapshot.

environment

emr 7.2.0
AmazonCloudWatchAgent 1.300032.2, 
Hive 3.1.3, 
Spark 3.5.1, 
Zeppelin 0.10.1

spark command

spark-shell --driver-memory 1g --executor-memory 2500m --executor-cores 1 --driver-cores 1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' --conf "spark.sql.caseSensitive=true"  --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED        --conf datanucleus.schema.autoCreateTables=true --conf "parquet.avro.write-old-list-structure=false" --packages org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0  --name ravic 

scala code

spark.conf.set("parquet.avro.write-old-list-structure",false).  // <<----- setting once again here

val sess= spark
var cdcDf = sess.read .schema(snapshotDf.schema) .format("json") .load("s3://bucket/inputs/cdc-jsons/*")
cdcDf.createOrReplaceTempView("cdc")
cdcDf = sess.sql("select * from cdc where _id is not null and _id.oid is not null ")
cdcDf.write.format("hudi")
      .options(getQuickstartWriteConfigs)
      .option("hoodie.datasource.hive_sync.partition_extractor_class","org.apache.hudi.hive.NonPartitionedExtractor")
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
      .option("hoodie.datasource.write.recordkey.field", "_id.oid")

.option("parquet.avro.write-old-list-structure","false") // <<---- setting again here ; tried with "false" and false (both)

      .option(HoodieWriteConfig.TABLE_NAME, "in_hudi_2")
      .mode(SaveMode.Append)
      .save("s3://bucket/snapshots-hudi/snapshot/");

error

Caused by: java.lang.NullPointerException: Array contains a null element at 1. Set parquet.avro.write-old-list-structure=false to turn on support for arrays with null elements.
    at org.apache.parquet.avro.AvroWriteSupport$TwoLevelListWriter.writeCollection(AvroWriteSupport.java:606)
    at org.apache.parquet.avro.AvroWriteSupport$ListWriter.writeList(AvroWriteSupport.java:444)
    at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:372)
    at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:291)
    at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:204)
    at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:178)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:138)
    at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:310)
    at org.apache.hudi.io.hadoop.HoodieBaseParquetWriter.write(HoodieBaseParquetWriter.java:149)
    at org.apache.hudi.io.hadoop.HoodieAvroParquetWriter.writeAvro(HoodieAvroParquetWriter.java:80)
    at org.apache.hudi.io.storage.HoodieAvroFileWriter.write(HoodieAvroFileWriter.java:51)
    at org.apache.hudi.io.storage.HoodieFileWriter.write(HoodieFileWriter.java:43)
    at org.apache.hudi.io.HoodieMergeHandle.writeToFile(HoodieMergeHandle.java:395)
    at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:377)
    ... 41 more
ad1happy2go commented 3 months ago

@rcc1101 Can you try setting spark.hadoop.parquet.avro.write-old-list-structure

rcc1101 commented 3 months ago

got new exception now

I recreated the new dump in hudi again, now with the above option added as follows

creating the dump

spark-shell --driver-memory 1g --executor-memory 2500m --executor-cores 1 
--driver-cores 1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' 
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' 
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' 
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' 
--conf "spark.sql.caseSensitive=true"  
--conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED 
--conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED 
--conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED 
--conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED        
--conf datanucleus.schema.autoCreateTables=true 
--conf "parquet.avro.write-old-list-structure=false" 
--conf spark.hadoop.parquet.avro.write-old-list-structure=true 
--packages org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0  
--name ravic 

scala code to create dump

import org.apache.spark.sql.SaveMode
val sess = spark
val snapshotDf = sess.read.parquet("s3://bucket/snapshots/prefix/_bid_9223370313087830657/")
snapshotDf.registerTempTable("snapshot")
snapshotDf.write.format("hudi")
      .option(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.BULK_INSERT.name())
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid")
      .option(HoodieWriteConfig.TABLE_NAME,"table_hudi_3")
      .option("parquet.avro.write-old-list-structure","false")
      .option("spark.hadoop.parquet.avro.write-old-list-structure","true")
      .mode(SaveMode.Overwrite)
      .save("s3://bucket/snapshots-hudi/prefix/snapshot-3/");

:quit

-- Good so far --

Now i am trying to update the hudi snapshot.The update fails. THe update code.

spark.conf.set("parquet.avro.write-old-list-structure",false)
spark.conf.set("spark.hadoop.parquet.avro.write-old-list-structure",true)
val sess= spark
val snapshotDf = sess.read.parquet("s3://bucket/snapshots/prefix/_bid_9223370313087830657/")
var cdcDf = sess.read .schema(snapshotDf.schema) .format("json") .load("s3://bucket/inputs/prefix/*")
cdcDf.createOrReplaceTempView("cdc")
cdcDf = sess.sql("select * from cdc where _id is not null and _id.oid is not null ")
cdcDf.write.format("hudi")
      .options(getQuickstartWriteConfigs)
      .option("hoodie.datasource.hive_sync.partition_extractor_class","org.apache.hudi.hive.NonPartitionedExtractor")
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
      .option("hoodie.datasource.write.recordkey.field", "_id.oid")
      .option("parquet.avro.write-old-list-structure","false")
      .option("spark.hadoop.parquet.avro.write-old-list-structure","true")
      .option(HoodieWriteConfig.TABLE_NAME, "table_hudi_3")
      .mode(SaveMode.Append)
      .save("s3://bucket/snapshots-hudi/prefix/snapshot-3/");
:quit

new exception when updating the merge. _id.oid is a grouping attribute and it comes from a dump from mongodb.

 Error upserting bucketType UPDATE for partition :2
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:344)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:254)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:911)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:911)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:61)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:61)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
    at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:382)
    at org.apache.spark.storage.BlockManager.$anonfun$getOrElseUpdate$1(BlockManager.scala:1388)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1630)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1540)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1604)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1405)
    at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1359)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:380)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:330)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:61)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:174)
    at org.apache.spark.scheduler.Task.run(Task.scala:152)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:632)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:635)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:151)
    at org.apache.hudi.table.HoodieSparkTable.runMerge(HoodieSparkTable.java:148)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:376)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:371)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:337)
    ... 33 more
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
    at org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:75)
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
    ... 37 more
Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
    at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:54)
    at org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39)
    at org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:67)
    ... 38 more
Caused by: java.lang.ClassCastException: optional binary value (STRING) is not a group
    at org.apache.parquet.schema.Type.asGroupType(Type.java:248)
    at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:362)
    at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:306)
    at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:79)
    at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:617)
    at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:567)
    at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:371)
    at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:144)
    at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:98)
    at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
    at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:146)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:190)
    at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:166)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
    at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
ad1happy2go commented 3 months ago

@rcc1101 Looks like this is related to this JIRA - https://issues.apache.org/jira/browse/HUDI-4798

rcc1101 commented 3 months ago

Ok, so this is known issue. And the issue is open since 7th Sep 2022.

oh! Does it mean that this is not a priority issue with hudi? Or does it mean that hudi is intended to work if only the parquet files are flat structured.

ad1happy2go commented 3 months ago

@rcc1101 These looks to be more related to parquet issues which though are still open. If you see details on JIRA -

Hudi needs to upgrade parquet version after following parquet issues are fixed:
https://issues.apache.org/jira/browse/PARQUET-1254
https://issues.apache.org/jira/browse/PARQUET-1681
https://issues.apache.org/jira/browse/PARQUET-2069

Tagging @codope for more insights here.

rcc1101 commented 2 months ago

Ok, So complex datatypes in parquets cannot be used with hudi?