apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.35k stars 2.42k forks source link

[SUPPORT]xxx.parquet is not a Parquet file (length is too low: 0) #8674

Open hbgstc123 opened 1 year ago

hbgstc123 commented 1 year ago

Describe the problem you faced

A flink write hudi job, we have hdfs jitter, cause flink task to fail over, and see this error

To Reproduce

Steps to reproduce the behavior:

*have chance to reproduce

1.flink write with online clustering 2.task fail over when StreamWriteOperatorCoordinator start new instant 3.task fail over again

Expected behavior

no error

Environment Description

Additional context

Stacktrace

2023-05-04 12:18:05,125 ERROR org.apache.hudi.sink.clustering.ClusteringOperator           [] - Executor executes action [Execute clustering for instant 20230504110903788 from task 0] error
org.apache.hudi.exception.HoodieException: unable to read next record from parquet file 
    at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53) ~[175020faf1b549f5886323df8573cb93:0.12.1]
    at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811) ~[?:1.8.0_252]
    at java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:295) ~[?:1.8.0_252]
    at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:207) ~[?:1.8.0_252]
    at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:162) ~[?:1.8.0_252]
    at java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:301) ~[?:1.8.0_252]
    at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) ~[?:1.8.0_252]
    at org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45) ~[175020faf1b549f5886323df8573cb93:0.12.1]
    at org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:273) ~[175020faf1b549f5886323df8573cb93:0.12.1]
    at org.apache.hudi.sink.clustering.ClusteringOperator.lambda$processElement$0(ClusteringOperator.java:196) ~[175020faf1b549f5886323df8573cb93:0.12.1]
    at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[175020faf1b549f5886323df8573cb93:0.12.1]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: java.lang.RuntimeException: hdfs://A1/projects/hive/dev_db/hudi_table/region=ctry_2/date=20230504/subp=subp_1/22903630-b6dc-42c1-97f5-fbd6d6e7fbff-9_1-2-1_20230504102509969.parquet is not a Parquet file (length is too low: 0)
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:539) ~[175020faf1b549f5886323df8573cb93:0.12.1]
    at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:776) ~[175020faf1b549f5886323df8573cb93:0.12.1]
    at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657) ~[175020faf1b549f5886323df8573cb93:0.12.1]
    at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:152) ~[175020faf1b549f5886323df8573cb93:0.12.1]
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) ~[175020faf1b549f5886323df8573cb93:0.12.1]
    at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48) ~[175020faf1b549f5886323df8573cb93:0.12.1]
    ... 13 more
hbgstc123 commented 1 year ago

Here is what I find from log of JM and TM log

trigger checkpoint 3574 -> Completed checkpoint 3574 -> trigger checkpoint 3575

-> TM AppendWriteFunction fail due to Timeout(601000ms) while waiting for instant initialize

-> checkpoint 3575 failed due to TM fail

-> TM restart, send bootstrap event to StreamWriteOperatorCoordinator

-> StreamWriteOperatorCoordinator generate new commit 20230504102505338 (trigger by checkpoint 3574 complete)

-> StreamWriteOperatorCoordinator receive bootstrap event from TM restart, start to rollback commit 20230504102505338 and start a new instant

-> AppendWriteFunction start to write with instant 20230504102505338(which is wrong, should write with the new instant after 20230504102505338 is rollbacked), and these new written data files of 20230504102505338 escape from rollback

-> instant 20230504102505338 rollback complete, start a new instant 20230504102509969

-> trigger checkpoint 3576 -> Completed checkpoint 3576

-> start to commit 20230504102509969

-> AppendWriteFunction start to write with instant 20230504102509969(which is wrong)

-> delete invalid data files of 20230504102509969, part of data files wrong writing is deleted, cause write conflict error File does not exist: ...... [Lease. Holder: DFSClient_NONMAPREDUCE_-536912522_86, pending creates: 5]

-> marker dir .temp/20230504102509969 removed leaving some 20230504102509969 data file of length 0 on hdfs

-> schedule clustering c1, include those escaped data files of 20230504102509969.

-> next time clustering run instant c1, will report this error.

hbgstc123 commented 1 year ago

Seems the cause is from when TM get the wrong instant 20230504102505338 to write, so I submit a pr to fix this: https://github.com/apache/hudi/pull/8673

danny0405 commented 1 year ago

From my understanding, if checkpoint 3574 had been successful(but the checkpoint success event missed for the coordinator), then we should still recommit the instant. Each write task should hold a write metadata and re-send the events, yeah, but it seems you are right, the coordinator does not know that. The question is, if it is a global failover for all the write tasks, the initialization of all the tasks bootstrap event would re-trigger the re-commit of the instant.

danny0405 commented 1 year ago

BTW, can you help to validate whether this patch can solve the problem: https://github.com/apache/hudi/pull/8594

hbgstc123 commented 1 year ago

From my understanding, if checkpoint 3574 had been successful(but the checkpoint success event missed for the coordinator), then we should still recommit the instant. Each write task should hold a write metadata and re-send the events, yeah, but it seems you are right, the coordinator does not know that. The question is, if it is a global failover for all the write tasks, the initialization of all the tasks bootstrap event would re-trigger the re-commit of the instant.

The instant commit successfully after checkpoint 3574 complete, 20230504102505338 is a new instant generate right after
AbstractStreamWriteFunction send the bootstrap event but before StreamWriteOperatorCoordinator receive the bootstrap events, then 20230504102505338 rollbacked and start a new instant 20230504102509969, but AbstractStreamWriteFunction wouldn't know it, because it think 20230504102505338 is the new instant.

hbgstc123 commented 1 year ago

BTW, can you help to validate whether this patch can solve the problem: #8594

This error is not easy to reproduce. But I think if the coordinator restart with the operators like this PR can avoid this error too. Because new instant only start after StreamWriteOperatorCoordinator receive all bootstrap events so AbstractStreamWriteFunction will not get the wrong instant.

hbgstc123 commented 1 year ago

BTW, can you help to validate whether this patch can solve the problem: #8594

Sorry I have some update about this, I run a test and read about the behavior of RecreateOnResetOperatorCoordinator , it only restart the coordinator in global failure, which is not the case of my issue, my case is all subtask restart.

So https://github.com/apache/hudi/pull/8594 can not solve this issue. @danny0405

eyjian commented 1 year ago

Is the same problom? https://github.com/apache/hudi/issues/8686

danny0405 commented 1 year ago

Maybe, it seems spark data source would include some invalid files in the reader view.

danny0405 commented 11 months ago

@hbgstc123 https://github.com/apache/hudi/pull/9867 here is the fix, maybe it's helpful for you.

hbgstc123 commented 11 months ago

Tested https://github.com/apache/hudi/pull/9867 locally, it can not fix this issue. Writer still can get the wrong instant.

And I draw a graph to describe how the writer get wrong instant to write

image

And I update my PR, it could solve this issue https://github.com/apache/hudi/pull/8673

zhangyue19921010 commented 9 months ago

Also find this issue in 0.13.1 branch.

zhangyue19921010 commented 9 months ago

@hbgstc123 still confusing about

-> delete invalid data files of 20230504102509969, part of data files wrong writing is deleted, cause write conflict error File does not exist: ...... [Lease. Holder: DFSClient_NONMAPREDUCE_-536912522_86, pending creates: 5]

-> marker dir .temp/20230504102509969 removed leaving some 20230504102509969 data file of length 0 on hdfs

would u mind leave your weChat number maybe? thanks!

nsivabalan commented 3 weeks ago

hey folks. is it that, some spurious files were added after marker reconciliation (may be stray executor) and one of the spurious file size is very less and we run into the aforementioned exception?

can someone summarize the issue to me.

nsivabalan commented 3 weeks ago

We are working to fix this using completion markers https://github.com/apache/hudi/pull/11593 fyi