apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.48k stars 2.44k forks source link

[SUPPORT]: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file #9918

Open Armelabdelkbir opened 1 year ago

Armelabdelkbir commented 1 year ago

Describe the problem you faced

Hello community,

i'm using Hudi to change data capture with spark structured streaming + kafka + debezium , my jobs works well, sometimes few jobs failed with errors related to parquet size or format

To Reproduce

Steps to reproduce the behavior:

  1. start long running replication streams

Expected behavior

Write / read parquet with correct size / format

Environment Description

Additional context

this problem occasionally occurs on certain tables this is my config:

hudi {
  options{
    upsert_parallelisme_value = "1500"
    insert_parallelisme_value = "1500"
    bulk_insert_parallelisme_value = "1500"
    bulk_insert_sort_mode = "NONE"
    parquet_small_file_limit = "104857600"
    streaming_retry_count = "3"
    streaming_retry_interval_ms ="2000"
    parquet_max_file_size = "134217728"
    parquet_block_size = "134217728"
    parquet_page_size = "1048576"
    index_type = "SIMPLE"
    simple.index_use_caching = "true"
    simple.index_input_storage_level = "MEMORY_AND_DISK_SER"
    partition.fields = ""
    generator = "org.apache.hudi.keygen.NonpartitionedKeyGenerator"
    key_generator.hive = "org.apache.hudi.hive.NonPartitionedExtractor"
  }
  compaction {
    inline_compact = "true"
    inline_compact_num_delta_commits = "10"
    cleaner_commits_retained = "4"
    cleaner_policy = "KEEP_LATEST_COMMITS"
    cleaner_fileversions_retained = "3"
    async_clean = "true"
  }

MVCC conf:

        HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key -> "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider",
      HoodieLockConfig.ZK_CONNECT_URL.key -> "zookeper-poll:2181",
      HoodieLockConfig.ZK_PORT.key -> "2181",
      HoodieLockConfig.ZK_LOCK_KEY.key -> ( table.table_name),
      HoodieLockConfig.ZK_BASE_PATH.key -> ("/"+table.db_name),
      HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.key -> "15",
      HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key -> "15",
      HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key -> "60000",
      HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key -> "60000",
      HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key -> "20000",

Stacktrace

*for small parquet size

 Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 9.0 failed 4 times, most recent failure: Lost task 20.3 in stage 9.0 (TID 6057) (ocnode46 executor 2): org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
at org.apache.hudi.common.util.ParquetUtils$HoodieKeyIterator.hasNext(ParquetUtils.java:485)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at org.apache.hudi.common.util.ParquetUtils.fetchHoodieKeys(ParquetUtils.java:197)
at org.apache.hudi.common.util.ParquetUtils.fetchHoodieKeys(ParquetUtils.java:147)
at org.apache.hudi.io.HoodieKeyLocationFetchHandle.locations(HoodieKeyLocationFetchHandle.java:62)
at org.apache.hudi.index.simple.HoodieSimpleIndex.lambda$fetchRecordLocations$33972fb4$1(HoodieSimpleIndex.java:155)
at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:117)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:177)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: hdfs://prod/cdc.db/database/table/723c5d09-573b-4df6-ad41-76ae19ec976f-0_2-16682-7063518_20231024224507047.parquet is not a Parquet file (too small length: 0)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:514)
at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:689)
at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:595)
at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:152)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
... 22 more 

one day i had also this error related to parquet format:

expected magic number at tail [80, 65, 82, 49] but found [2, -70, -67, -119] at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:524)
ad1happy2go commented 1 year ago

@Armelabdelkbir I recommend you upgrade your Hudi version to 0.12.3 or 0.13.1 or 0.14.0. It may happen due to missing column in later records compared to previous ones. Do you have any such scenario?

Armelabdelkbir commented 1 year ago

missing columnar, do you mean schema evolution, sometimes we have schema evolution, but not for this usecase. what is the impact of upgrade on production i have hundred of tables and billions of rows, i need just to upgrade the hudi version and keep same metadata folders ?

ad1happy2go commented 1 year ago

@Armelabdelkbir You just need to upgrade the Hudi version. It should automatically upgrade. Were your metadata was enabled with 0.11.0? I guess it was off by default with 0.11.0. I recommend you to upgrade to 0.12.3 .

In this usecase, is your schema always consistent?

Armelabdelkbir commented 1 year ago

@ad1happy2go my metadata is disabled in version 0.11.0: "hoodie.metadata.enable" -> "false" , currently I can't upgrade until the client migration is complete. schema evolution happen sometimes, but it disabled on my side due to my hive version 1.2.1 by waiting if i have empty parquets problems i need just to delete them ?

ad1happy2go commented 1 year ago

You can try deleting these parquet files, although need to understand how they got created at first place.

watermelon12138 commented 1 year ago

maybe compaction produced the broken parquet file when it failed for the first time and produced the normal parquet file when it retried successfully。There will be tow parquet file with same filegroup ID and instance time, like this: xxx partition --- 00000000_1-2-3_2023110412345.parquet (broken parquet) --- 00000000_4-5-6_2023110412345.parquet (normal parquet) you can see this issue to resolve the problem. https://github.com/apache/hudi/issues/9615

watermelon12138 commented 1 year ago

@Armelabdelkbir

Armelabdelkbir commented 1 year ago

@watermelon12138 thanks for the link of issue i'll check, my files are in the same filegroup: example 3e2e9939-71f0-41dc-a5ff-c276ae3cdfc6-0_0-819-355182_20231108134016057.parquet (broken parquet) ccf19756-bce5-402b-b85e-64232e2f34b2-0_242-819-355161_20231108134016057.parquet(normal parquet)

victorxiang30 commented 11 months ago

I did encounter this in hudi 13.0 as well.

ad1happy2go commented 10 months ago

@victorxiang30 @Armelabdelkbir @watermelon12138 Can you provide the schema to help me to reproduce this.

If it has complex data type, can you try setting spark config spark.hadoop.parquet.avro.write-old-list-structure as false.