apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.41k stars 2.43k forks source link

org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 #1326

Closed matthewLiem closed 4 years ago

matthewLiem commented 4 years ago

Hello - using EMR (hudi 0.5, spark 2.4.4) and during upsert i'm running into the below error:

There were similar issues posted before, but not specific to ParquetDecodingException. I'm able to read the hudi table/data set directly so i dont think its related to the parquet files. I am trying to trim down the data and provide a repro but checking if anyone has pointers. This is a non partitioned table and i'll see if its related to RECORDKEY_FIELD_OPT_KEY or PRECOMBINE_FIELD_OPT_KEY.

20/02/12 00:34:49 INFO HoodieWriteClient: Auto commit disabled for 20200212003312
[Stage 69:>                                                         (0 + 1) / 1]20/02/12 00:34:53 WARN TaskSetManager: Lost task 0.0 in stage 69.0 (TID 37277, ip-10-80-65-54.vpc.internal, executor 62): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
    at org.apache.hudi.table.HoodieCopyOnWriteTable.handleUpsertPartition(HoodieCopyOnWriteTable.java:264)
    at org.apache.hudi.HoodieWriteClient.lambda$upsertRecordsInternal$507693af$1(HoodieWriteClient.java:428)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
    at org.apache.hudi.table.HoodieCopyOnWriteTable.handleUpdateInternal(HoodieCopyOnWriteTable.java:202)
    at org.apache.hudi.table.HoodieCopyOnWriteTable.handleUpdate(HoodieCopyOnWriteTable.java:178)
    at org.apache.hudi.table.HoodieCopyOnWriteTable.handleUpsertPartition(HoodieCopyOnWriteTable.java:257)
    ... 30 more
Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:142)
    at org.apache.hudi.table.HoodieCopyOnWriteTable.handleUpdateInternal(HoodieCopyOnWriteTable.java:200)
    ... 32 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:140)
    ... 33 more
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:230)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:209)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:51)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:260)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:120)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    ... 3 more
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3://hudi_poc_mliem/hudi_identity/default/85a8812c-638d-4782-b85a-d7d04d092b31-0_611-6-2594_20200211195523.parquet
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
    at org.apache.hudi.func.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:47)
    at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:44)
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:91)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    ... 4 more
Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary
    at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
    at org.apache.parquet.avro.AvroConverters$BinaryConverter.setDictionary(AvroConverters.java:75)
    at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:341)
    at org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
    at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
    at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
    at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
    at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
    ... 11 more

here are some other details:

val hudiOptions = Map[String,String](
    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "auth_id",
    HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
    DataSourceWriteOptions.OPERATION_OPT_KEY ->
        DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, 
    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_mod_time",  
    DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", 
    DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName,
    DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY -> hudiDatabaseName,
    DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> "org.apache.hudi.hive.NonPartitionedExtractor")

val updateDF = inputDF.filter(col("auth_id") === requestIpToUpdate).withColumn("run_detail_id", lit("123456"))

updateDF.write.format("org.apache.hudi").options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)
lamberken commented 4 years ago

Hi @matthewLiem, thanks for report this. Does this issue always happen?

matthewLiem commented 4 years ago

yup - it's happening consistently. i've reduced input data to a single file and still seeing error. Also regardless of upsert/delete, i get the error. will see what else i can run to help isolate the issue and try to produce a sample data file for repro

lamberken commented 4 years ago

Got it, trying to figure out it. Also, it's nicer if there is a sample data can reproduce this. :)

matthewLiem commented 4 years ago

Thanks for the help @lamber-ken. The issue was due to a mismatch in data types between the hudi table and the DF we're looking to UPSERT. Casting properly and ensuring schema matched types across both resolved the issue.

lamberken commented 4 years ago

@matthewLiem welcome :)

Record some notes:

Change lit(123456) to lit(123456L)

val updateDF = inputDF.withColumn("run_detail_id", lit(123456))

Reproduce steps:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val inputDataPath = "file:///tmp/test/ttt/*"
val hudiTableName = "hudi_identity"
val hudiTablePath = "file:///tmp/test/nnn"

val hudiOptions = Map[String,String](
    "hoodie.datasource.write.recordkey.field" -> "auth_id",
    "hoodie.table.name" -> hudiTableName, 
    "hoodie.datasource.write.precombine.field" -> "last_mod_time")

// create
val inputDF = spark.read.format("parquet").load(inputDataPath)
inputDF.write.format("org.apache.hudi").options(hudiOptions).mode("Overwrite").save(hudiTablePath)    

// update
val inputDF = spark.read.format("parquet").load(inputDataPath)
val updateDF = inputDF.withColumn("run_detail_id", lit(123456))
updateDF.write.format("org.apache.hudi").options(hudiOptions).mode("Append").save(hudiTablePath)

data.parquet.zip

tooptoop4 commented 4 years ago

@lamber-ken https://github.com/apache/hudi/issues/1802 looks similar, any idea?

absognety commented 3 years ago

Thanks for the help @lamber-ken. The issue was due to a mismatch in data types between the hudi table and the DF we're looking to UPSERT. Casting properly and ensuring schema matched types across both resolved the issue.

I am reading snowflake tables and writing to S3 in hudi formats, facing this issue consistently, How do I identify which columns to cast? because Reading existing data from S3 and reading incremental data from snowflake into Spark, the both dataframes have same data types.

@nsivabalan any suggestions on fixes for this, this is causing issues in our production