apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.2k stars 2.38k forks source link

Corrupted parquet file in hudi partition | Deletion of partition in Hudi #11371

Open koochiswathiTR opened 1 month ago

koochiswathiTR commented 1 month ago

Tips before filing an issue

We are facing issue with a corrupted parquet file in one of the Hudi partition while reading Hudi table.

We are following the below steps

  1. Shut down Hudi Writer to stop the ingestion
  2. Read the Hudi Dataset and write with .option("hoodie.datasource.write.operation", "delete_partition")
  3. val s3path = "s3://hudi-dev-use1/hudi_table/" val data = spark.read.format("org.apache.hudi").load(s3path) val deleteKeysDF = data.filter($"NAME" === "name_1").select("ID") val writeStatus = deleteKeysDF.write.format("org.apache.hudi").option("hoodie.table.name", "hudi_table"). option("hoodie.datasource.write.recordkey.field", "ID").option("hoodie.datasource.write.table.name", "hudi_table"). option("hoodie.datasource.write.operation", "delete").option("hoodie.upsert.shuffle.parallelism", "2").mode(SaveMode.Append).save(s3path)
  4. Bring back the Hudi Writer to continue the ingestion.

If we follow these steps, will we come out of the corrected parquet file issue? Please share any other ways to completely remove hudi partition from Hudi Table. Its a production issue, Any quick help is highly appreciated.

A clear and concise description of what you expected to happen.

PROD

Additional context

Add any other context about the problem here.

Stacktrace

Can not read value at 0 in block -1 in file

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3655 in stage 13.0 failed 4 times, most recent failure: Lost task 3655.3 in stage 13.0 (TID 100513) (ip-100-67-247-116.8043.aws-int.thomsonreuters.com executor 24): java.lang.NegativeArraySizeException

at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:285) at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237) at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPageV1(VectorizedColumnReader.java:301) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.access$000(VectorizedColumnReader.java:47) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:240) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:236)

koochiswathiTR commented 1 month ago

@dacort @sullis @jfrazee @kkrugler @ad1happy2go @xushiyan @soumilshah1995 @codope @calvertj @ziudu @hamadjaved @usberkeley @mattwong949 @yihua @nsivabalan Tagged you all, to reach out the message to you, as we are in a prod issue.

ad1happy2go commented 1 month ago

@koochiswathiTR this May happen due to multiple writers without concurrency control only. But as you mentioned that you stopped the ingestion job you should not face this problem ideally.

Can you reproduce this behaviour consistently. Can you provide the timeline please to analyse the order of actions.

soumilshah1995 commented 1 month ago

yup agree to Aditya give us some example to reproduce this full code with mock data

koochiswathiTR commented 3 weeks ago

@soumilshah1995 @ad1happy2go ,

We don`t know, how to reproduce this issue. To mitigate the issue, We would like to delete the partition , below are the options we tried. But None worked.

Option 1: Delete all the IDs and delete the partitionName val tniPath = "s3://s3_bucket_name/tmp/testhudi/document"

val data = spark.read.format("org.apache.hudi").load(tniPath)

val deleteKeysDF = data.filter($"partitionName" === "partition_name").select("ID")

deleteKeysDF.count()

val writeStatus = deleteKeysDF.write.format("hudi").option("hoodie.table.name", "document").option("hoodie.datasource.write.recordkey.field", "ID").option("hoodie.datasource.write.table.name", "document").option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL).option("hoodie.upsert.shuffle.parallelism", "2").mode(SaveMode.Append).save(tniPath)

Option 2: Delete the Partition Directory

val tniPath = "s3://s3_bucket_name/tmp/testhudi/document/"

val data1 = spark.read.format("org.apache.hudi").load(tniPath)

data1.write.format("hudi").option("hoodie.table.name", "novusdoc").option("hoodie.datasource.write.recordkey.field", "ID").option("hoodie.datasource.write.table.name", "document").option("hoodie.datasource.write.operation", "delete_partition").option("hoodie.datasource.write.partitions.to.delete", "patition_name").option("hoodie.upsert.shuffle.parallelism", "2").mode(SaveMode.Append).save(tniPath)

Both are creating commits and but compaction is also not happening but these files are not getting deleted. counts of these deleted partitions are also not decreased.

soumilshah1995 commented 3 weeks ago

you an use https://soumilshah1995.blogspot.com/2023/07/removing-duplicates-in-hudi-partitions.html to remove duplicates from partitions

soumilshah1995 commented 3 weeks ago

to delete partitions you can use https://github.com/soumilshah1995/code-snippets/blob/main/delete%2Bpartition_hudi.py

udi_options = {}

try:
    spark.createDataFrame([], StructType([])) \
        .write \
        .format("org.apache.hudi") \
        .options(**hudi_options) \
        .option("hoodie.datasource.hive_sync.enable", False) \
        .option("hoodie.datasource.write.operation", "delete_partition") \
        .option("hoodie.datasource.write.partitions.to.delete", "state=Connecticut") \
        .mode("append") \
        .save(path)
    print("2.. ")
except Exception as e:
    print("Error 2", e)