apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.47k stars 2.43k forks source link

Recovering job from checkpoint, reporting NoSuchElementException and data exception #11023

Open jack1234smith opened 7 months ago

jack1234smith commented 7 months ago

Describe the problem you faced

Error exception: java.util.NoSuchElementException: No value present in Option at org.apache.hudi.common.util.Option.get(Option.java:89) at org.apache.hudi.table.format.mor.MergeOnReadInputFormat.initIterator(MergeOnReadInputFormat.java:204) at org.apache.hudi.table.format.mor.MergeOnReadInputFormat.open(MergeOnReadInputFormat.java:189) at org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:169) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:745)

data error: 17131698404766

To Reproduce

Steps to reproduce the behavior:

  1. kill yarn session
  2. Restart job from checkpoint

Environment Description

Additional context

My table are: CREATE TABLE if not exists ods_table( id int, count_num double, write_time timestamp(0), _part string, proc_time timestamp(3), WATERMARK FOR write_time AS write_time ) PARTITIONED BY (_part) WITH ( 'connector'='hudi', 'path' ='hdfs://masters/test/ods_table', 'table.type'='MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'id', 'hoodie.datasource.write.precombine.field' = 'write_time', 'write.bucket_assign.tasks'='1', 'write.tasks' = '1', 'compaction.tasks' = '1', 'compaction.async.enabled' = 'true', 'compaction.schedule.enabled' = 'true', 'compaction.trigger.strategy' = 'time_elapsed', 'compaction.delta_seconds' = '600', 'compaction.delta_commits' = '1', 'read.streaming.enabled' = 'true', 'read.streaming.skip_compaction' = 'true', 'read.start-commit' = 'earliest', 'changelog.enabled' = 'true',
'hive_sync.enable'='true', 'hive_sync.mode' = 'hms', 'hive_sync.metastore.uris' = 'thrift://h35:9083', 'hive_sync.db'='test', 'hive_sync.table'='hive_ods_table' );

CREATE TABLE if not exists ads_table( sta_date string, num double, proc_time as proctime() ) WITH ( 'connector'='hudi', 'path' ='hdfs://masters/test/ads_table', 'table.type'='COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field' = 'sta_date', 'write.bucket_assign.tasks'='1', 'write.tasks' = '1', 'compaction.tasks' = '1', 'compaction.async.enabled' = 'true', 'compaction.schedule.enabled' = 'true', 'compaction.trigger.strategy' = 'time_elapsed', 'compaction.delta_seconds' = '600', 'compaction.delta_commits' = '1', 'read.streaming.enabled' = 'true', 'read.streaming.skip_compaction' = 'true', 'read.start-commit' = 'earliest', 'changelog.enabled' = 'true',
'hive_sync.enable'='true', 'hive_sync.mode' = 'hms', 'hive_sync.metastore.uris' = 'thrift://h35:9083', 'hive_sync.db'='test', 'hive_sync.table'='hive_ads_table' );

My job is: insert into test.ads_table select _part, sum(count_num) from test.ods_table group by _part;

danny0405 commented 7 months ago

Weird, it seems this split does not contain any base or log files, it is peobably caused by ingestion corruption.

ad1happy2go commented 7 months ago

@jack1234smith Did you able to figure out the issue here? Please let us know in case you still need help.