apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT] Compaction error #9885

Open fearlsgroove opened 12 months ago

fearlsgroove commented 12 months ago

I'm using spark structured streaming to ingest kafka records in avro format into a MoR hudi table. Somehow I've gotten something fouled up and compactions cannot complete. I'm not configuring any non-default settings for compaction, and spark tasks on the 'hoodiecompact' pool always fail with the stack below.

I do have a decimal field with precision 12 in the messages, but it hasn't changed to 13 anywhere. I don't know if it's possible to inspect the messages somehow to see if there are unexpectedly but I don't see how there could be.

Expected behavior The compaction completes without errors.

Environment Description

Stacktrace

org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: org.apache.avro.AvroTypeException: Cannot encode decimal with precision 13 as max precision 12
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.handleUpdateInternal(HoodieSparkCopyOnWriteTable.java:252)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.handleUpdate(HoodieSparkCopyOnWriteTable.java:235)
    at org.apache.hudi.table.action.compact.CompactionExecutionHelper.writeFileAndGetWriteStats(CompactionExecutionHelper.java:64)
    at org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:237)
    at org.apache.hudi.table.action.compact.HoodieCompactor.lambda$compact$988df80a$1(HoodieCompactor.java:132)
    at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1555)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1465)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1529)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1352)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hudi.exception.HoodieException: org.apache.avro.AvroTypeException: Cannot encode decimal with precision 13 as max precision 12
    at org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:75)
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147)
    ... 28 more
Caused by: org.apache.avro.AvroTypeException: Cannot encode decimal with precision 13 as max precision 12
    at org.apache.avro.Conversions$DecimalConversion.validate(Conversions.java:140)
    at org.apache.avro.Conversions$DecimalConversion.toFixed(Conversions.java:104)
    at org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1077)
    at org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:1001)
    at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:946)
    at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:873)
    at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:944)
    at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:873)
    at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:894)
    at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:873)
    at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:843)
    at org.apache.hudi.common.model.HoodieAvroIndexedRecord.rewriteRecordWithNewSchema(HoodieAvroIndexedRecord.java:123)
    at org.apache.hudi.common.model.HoodieRecord.rewriteRecordWithNewSchema(HoodieRecord.java:382)
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.lambda$runMerge$0(HoodieMergeHelper.java:136)
    at org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:68)
    ... 29 more
ad1happy2go commented 12 months ago

@fearlsgroove Can you share the table and writer configurations.

fearlsgroove commented 12 months ago

Sure thanks for the reply:

    "hoodie.datasource.write.recordkey.field": "id,item_date",
    "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
    "hoodie.datasource.write.partitionpath.field": "partition_year",
    "hoodie.datasource.write.table.name": "the_table_name",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.precombine.field": "event_date",
    "hoodie.datasource.write.hive_style_partitioning": True,
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.clustering.execution.strategy.class": "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
    "hoodie.clustering.plan.strategy.sort.columns": "id",
    "hoodie.upsert.shuffle.parallelism": 32,
    "hoodie.insert.shuffle.parallelism": 32,
    "hoodie.bulkinsert.shuffle.parallelism": 32,
    "hoodie.clean.async": True,
    "hoodie.clustering.async.enabled": True,
    "hoodie.clustering.async.max.commits": 8,  # default is 4
    # Seems to avoid some concurrency/retry errors. Possibly ineffective
    "hoodie.clustering.updates.strategy": "org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy",
ad1happy2go commented 12 months ago

@fearlsgroove By stack trace, it looks like you got some data which is of higher precision i.e 13 for that column. Do you have any way to confirm if that is not the case. Although not sure why it didn't failed while writing log files.

I will try to insert the similar data to confirm if it fails in writer or compaction.

fearlsgroove commented 12 months ago

Yea I was thinking that as well but I don't see how it could be the case. We did evolve the schema in the source topic recently, but the only decimal field in the data did not change, and I'm 99% sure we've only pulled records with the new schema anyway.

If I look at all the data i can pull from the topic i don't see any fields with decimal precision that would indicate the problem. Is there any way you can suggest to inspect the records in logs that are going to be compacted so I can try to confirm and/or correct it?

ad1happy2go commented 12 months ago

@fearlsgroove Can you try to read the data (without compaction) and see if you are able to and if data looks correct?

fearlsgroove commented 12 months ago

Yeap i can read the data succcessfully. Schema shows the only decimal value as a (12,2) value which is what I expect. Is there a way to try to read whatever records in the logs due for compaction might be causing this issue?

ad1happy2go commented 11 months ago

Can you try setting spark.sql.storeAssignmentPolicy=legacy

Referring this issue - https://github.com/apache/hudi/issues/6209

ad1happy2go commented 11 months ago

@fearlsgroove Were you able to resolve it. Let us know if you still face this issue. Thanks.

watermelon12138 commented 11 months ago

@fearlsgroove we will meet this problem only if oldSchema and writeSchema have diff. The writeSchema comes from the input data. So, I suggest you to check the schema in deltacommit metadata file. These meta files generally saved in /xxx_hudi_table/.hoodie/ directory.

nsivabalan commented 6 months ago

hey @ad1happy2go : reminder to follow up on this