apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

[SUPPORT] Data corruption in parquet file in Hudi table (ParquetDecodingException) #11708

Open mzheng-plaid opened 1 month ago

mzheng-plaid commented 1 month ago

Describe the problem you faced

(This seems related to https://github.com/apache/hudi/issues/10029#issuecomment-2253533412)

We are running into a data corruption bug with Hudi ingestion into a table which we suspect is happening at the parquet-java layer due to some interaction with Hudi.

Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 204757 in block 0 in file xxx.parquet
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:264)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
    at org.apache.hudi.common.util.ParquetReaderIterator.next(ParquetReaderIterator.java:67)
    ... 8 more
Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value in column [foo] optional float foo at value 204757 out of 463825, 4757 out of 20000 in currentPage. repetition level: 0, definition level: 1
    at org.apache.parquet.column.impl.ColumnReaderBase.readValue(ColumnReaderBase.java:553)
    at org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:30)
    at org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:439)
    at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
    at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:234)
    ... 10 more
Caused by: java.lang.ArrayIndexOutOfBoundsException

Column foo is of float type and is an enum that has valid values from 0 to 5. There seems to be a bug in the parquet dictionary encoding where somehow a value of 6 was written which is outside the 0-5 range.

❯ RUST_BACKTRACE=1 pqrs cat ./xxx.parquet --json | jq '.model_output' | sort | uniq -c

######################################################################################
File: ./xxx.parquet
######################################################################################

thread 'main' panicked at /Users/runner/.cargo/registry/src/index.crates.io-6f17d22bba15001f/parquet-51.0.0/src/encodings/rle.rs:496:61:
index out of bounds: the len is 6 but the index is 6
stack backtrace:
   0: _rust_begin_unwind
   1: core::panicking::panic_fmt
   2: core::panicking::panic_bounds_check
   3: parquet::encodings::rle::RleDecoder::get_batch_with_dict
   4: <parquet::encodings::decoding::DictDecoder<T> as parquet::encodings::decoding::Decoder<T>>::get
   5: <parquet::column::reader::decoder::ColumnValueDecoderImpl<T> as parquet::column::reader::decoder::ColumnValueDecoder>::read
   6: parquet::column::reader::GenericColumnReader<R,D,V>::read_records
   7: parquet::arrow::record_reader::GenericRecordReader<V,CV>::read_records
   8: <parquet::arrow::array_reader::primitive_array::PrimitiveArrayReader<T> as parquet::arrow::array_reader::ArrayReader>::read_records
   9: <parquet::arrow::array_reader::struct_array::StructArrayReader as parquet::arrow::array_reader::ArrayReader>::read_records
  10: <parquet::arrow::arrow_reader::ParquetRecordBatchReader as core::iter::traits::iterator::Iterator>::next
  11: pqrs::utils::print_rows
  12: pqrs::main
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
120639 0
53220 1
 973 2
 125 3
  41 4
21610 5

This is problem because Hudi successfully commits the transaction, and then subsequent reads of the file fail (which also blocks ingestion due to upserts touching the corrupted file)

  1. What is the best way to recover from this? Can we just delete xxx.parquet without modifying the timeline? We are ok with data loss localized to this one corrupted file
  2. What could be causing this issue? This has occurred 3-4 times now in the last 6 months and always affects float columns in a few tables. I am not sure how to reproduce this issue because re-ingesting the raw data again works fine, so this issue seems non-deterministic.

To Reproduce

Unsure

Expected behavior

Environment Description

This is run on EMR 6.10.1

Additional context

N/A

Stacktrace

See above

danny0405 commented 1 month ago

Did you use Spark for data ingestion?

mzheng-plaid commented 1 month ago

@danny0405 sorry yes correct

danny0405 commented 1 month ago

Did you enable the inference execution of Spark?

mzheng-plaid commented 1 month ago

No, speculative execution is not enabled

ad1happy2go commented 1 month ago

@mzheng-plaid Did you tried to disabling the vectorised reader spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false") Were you able to read this parquet file using spark.read.parquet

mzheng-plaid commented 1 month ago

Yes, the parquet file itself is corrupted. Trying to read the parquet file segfaults:

❯ RUST_BACKTRACE=1 pqrs cat ./xxx.parquet --json | jq '.model_output' | sort | uniq -c

Trying to read with spark.read.format("parquet").load("xxx.parquet") fails as expected (regardless of spark.sql.parquet.enableVectorizedReader, I tried with it set to false):

Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value in column [foo] optional float foo at value 204757 out of 463825, 4757 out of 20000 in currentPage. repetition level: 0, definition level: 1
    at org.apache.parquet.column.impl.ColumnReaderBase.readValue(ColumnReaderBase.java:553)
    at org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:30)
    at org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:439)
    at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
    at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:229)
    ... 19 more
Caused by: java.lang.ArrayIndexOutOfBoundsException