apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.35k stars 2.42k forks source link

[SUPPORT] Clustering created files with modified schema resulting in corrupted table #8259

Open kazdy opened 1 year ago

kazdy commented 1 year ago

Describe the problem you faced

I'm observing failed spark stages "Doing partition and writing data: <table name>" in " SparkUpsertCommitActionExecutor" job group. Effectively it can no longer insert or update records. There were no changes to schema or datatypes, I only switched from inline to async services for CoW table that is updated by spark structured streaming job. Only one job suffers from this issue after switching to async services. I can see that async services are running fine eg cleaning. I'm using in-process lock, metadata table is enabled.

After reviewing the file itself I don't see anything obviously wrong with it when it comes to columns and types. I see that one column was relocated from the middle of the schema to the end of the schema in the parquet file itself. The file that's causing issues was created by clustering action. I checked it by looking at commits, the only commit related to this file is a replacecommit.

EDIT (22/03/2023): Note that "schema" in replacecommit file has "year" column in the middle of the schema, for some reason it's only relocated in the file created by clustering.

To Reproduce

Steps to reproduce the behavior:

run async clustering, do not change record schema, partition column is relocated to the end of the schema

EDIT (23/03/2023): Fully reproduced example with hudi files https://github.com/kazdy/HUDI-8259/tree/main EDIT (30/03/2023) Updated repo with better example to reproduce (in replicated branch): https://github.com/kazdy/HUDI-8259

Expected behavior

Hudi continues to write updates with no failed stages after clustering is done.

Environment Description

Additional context

Previous job run was stopped (killed) while Hudi was writing data, in this job that we see the issue first action taken was rollback. It can be that something went wrong with this rollback? I also see that async clustering was running, maybe file was corrupted while being rewritten?

Stacktrace

Job aborted due to stage failure: Task 0 in stage 90.0 failed 4 times, most recent failure: Lost task 0.3 in stage 90.0 (TID 203) ([executorid] executor 4): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:378)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:138)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
    ... 28 more
Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161)
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147)
    ... 31 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155)
    ... 32 more
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    ... 3 more
Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file 
    at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
    at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    ... 4 more
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3://pathtoparuqetfile/somefile.parquet
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
    at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
    ... 8 more
Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary
    at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
    at org.apache.parquet.avro.AvroConverters$BinaryConverter.setDictionary(AvroConverters.java:77)
    at org.apache.parquet.column.impl.ColumnReaderBase.<init>(ColumnReaderBase.java:415)
    at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:46)
    at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:82)
    at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
    at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:177)
    at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:136)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:225)
    ... 11 more
kazdy commented 1 year ago

I inspected all files that were source files for clustering and all have the same schema. The file created as result of clustering has changed schema, column named "year" was moved from the middle of the file schema to the end of the schema.

before clustering: awesome_column_1 awesome_column_1 year month header

after clustering awesome_column_1 awesome_column_1 month header year

No other column was relocated.

It looks like during clustering schema was modified and now it's causing issues. Interestingly I have tables with exactly the same schema where clustering did not break anything after enabling async clustering. The only difference is that here year column is used as the only partition column, in the other table where everything is fine I'm using year and month as partition columns.. UPDATE: ^^ that's no longer true, table where year and month was used as parition columns is also broken after clustering and I can not write to it anymore. Encountering the same errors. This time 3 files were clustered one from partition year=2023/month=02 and one from year=2023/month=03.

In this case files from year=2022 and year=2023 were clustered and two files (one for 2022 and one for 2023) were created.

When it comes to "fixing" this table I think I can just rollback to the commit before clustering (last before replacecommit) and it should be fine? And I guess it's a good idea to disable clustering just to be safe.

Similar to #5262 where user also claims no explicit changes to record schema.

kazdy commented 1 year ago

@nsivabalan I would really appreciate any support or guidance here, I have two broken tables in prod now and need to come up with a fix or workaround. Thanks!

danny0405 commented 1 year ago

I'm using in-process lock

In-process lock can not work correctly for MDT with async table services.

kazdy commented 1 year ago

EDIT: I'm (now) aware there's a regression with async clustering and MDT, I was not earlier, but now it's disabled. So i guess I'm fine there and should not miss any updates.

https://hudi.apache.org/docs/metadata This blog advises users to use in process lock when metadata table and async services are enabled for single writer scenario. Is the blog incorrect? Also, Hudi automatically uses InProcessLockProvider if async table service is used and no other lock provider is defined: https://github.com/apache/hudi/blob/a36c8e0f9732217198b8a2b425ea1bd16287f9b5/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java#L3003-L3022 Is this Hudi behavior incorrect as well? It was specifically introduced because MDT requires lock provider, why can't it be used? Is there a bug in MDT?

~~Or what you meant is that I should not use async metadata indexing with other async table services in single writer mode and in-process lock provider? If so why not in this case? Hi @codope, afaik you implemented async metadata indexing, is it true I can't use InProcessLockProvider in single writer mode?~~ EDIT: I searched in Hudi issues and answered myself, async metadata indexing should not be used with InProcessLockProvider and when enabled users should be running HudiIndexer as a separate job.

Nevertheless I was able to reproduce the issue without async table services. The sequence looks like this:

  1. write to hudi,
  2. run clustering using spark procedure,
  3. try to update hudi table.

Looks like the issue is in clusterig itself. EDIT: I tried to do a write with reconcile.schema enabled just to be sure table schema is used instead of incoming batch schema, it still fails.

kazdy commented 1 year ago

Hi @nsivabalan I see that Danny assigned you to this ticket. I was able to replicate this exact case. Previous one was not exactly exposing the issue. ~I'll update the repo soon.~ Here's repo with code that replicates this issue (I also attached all the files and hudi table): https://github.com/kazdy/HUDI-8259 (branch replicated)

Here's what I found out and some additional info.

First I'm wondering why in the stacktrace I'm getting:

java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary
    at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)

I don't have any Long/Bigint in the table schema nor in the incoming schema, all numeric types are explicitly cast to INT or DECIMAL(10,0) (default for spark).

The schema of the incoming batch should look like this (this is also schema of the table before clustering):

col1 String
col2 String
col3 Int
col4 Int
col5 Timestamp
partitionCol1 int // used for partitioning
partitionCol2 int // partitionCol2 is not used by hudi to partition table, only partitionCol1 is used.
col6 String
col7 timestamp
col8 int
col9 string
col10 string
col11 string
col12 string
col13 decimal
col14 string
col15 string
col16 string
col17 string
col17 string
col18 string
col18 string
col19 int
col20 string
col21 string
col22 string
col23 int
col24 string
col25 string
col26 string
col27 string
col28 string
col29 string
col30 string
col31 int
col31 int
col32 string
col33 string
col34 string
col35 string
col36 string
col37 string

Schema of clustered parquet files (created by clustering):

col1 String
col2 String
col3 Int
col4 Int
col5 Timestamp
partitionCol2 int // partitionCol2 is not used by hudi to partition table, only partitionCol1 is used.
col6 String // (earlier at this place was partitionCol1 int) Tries to read Int but instead needs to read String? idk
col7 timestamp 
col8 int
col9 string
col10 string
col11 string
col12 string
col13 decimal
col14 string
col15 string
col16 string
col17 string
col17 string
col18 string
col18 string
col19 int
col20 string
col21 string
col22 string
col23 int
col24 string
col25 string
col26 string
col27 string
col28 string
col29 string
col30 string
col31 int
col31 int
col32 string
col33 string
col34 string
col35 string
col36 string
col37 string
partitionCol1 int // earlier at this position was col37 string

Schema in replacecommit conforms to the incoming batch schema/table schema (is correct, but at the same time is different than schema in related base parquet file). I don't know if Hudi resolves columns by position or by name and if it matters when reading parquet file for merging. If it was by position then col6 String (earlier at this place was partitionCol2 int) Hudi will try to read this column as of type Int but instead needs to read String? Therefore it can't since for PlainLongDictionary there's no decodeToBinary implementation available? Idk if it makes any sense at all, but this is my intuition.

I also have the case with same schema where both partitionCol1 and partitionCol2 are relocated, getting same exception on upsert. It looks like hudi moves columns specified in hoodie.datasource.write.partitionpath.field to the end during clustering.

Btw. As far as I checked, there's no issue with reading (spark.read...) two partitions where one contains clustered parquet and the other new parquet with correct schema. Just the partitionCol1 column is still at the end of the df schema.

mansipp commented 1 year ago

Hi @kazdy @nsivabalan In my testing I have observed that it's working for open source release 0.13.0 but one more thing I noticed in release 0.13.0 also is, the schema has been changed after clustering (same as shown above in "Schema of clustered parquet files:") but there might be some logic that handles this schema changes while performing upsert.

kazdy commented 1 year ago

@mansipp I was able to replicate this case using Hudi 0.13 as well. Files created by clustering have changed schema. Interestingly there's no issue if eg the order of types (not columns) does not change eg if I have only columns of type String.

mansipp commented 1 year ago

@kazdy can you please share more information on it?. are you getting the same errors as 0.12.1?

kazdy commented 1 year ago

It was 0.12.2 not 0.13, sorry for the confusion. The issue does not occur with 0.13

xushiyan commented 1 year ago

Tried repro and found that moving partition field to the end is still an issue in 0.13.0/0.12.3/0.12.2 when enabling both row-writing and hivestyle partition. will re-open for now.

VIKASPATID commented 7 months ago

Hi, We are also facing similar issue where enabling clustering and then running 2-3 upserts results in column order/value mismatch ending up corrupting table. We are facing this issue on emr-6.15.0, spark 3.4.1, apache hudi 0.14. Please look into it, as we are not able to use clustering at all.

VitoMakarevich commented 5 months ago

Can you try with hoodie.datasource.write.row.writer.enable = false? It's described here, problem is a bit different, for us it helped to create a file with the right format.