apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.42k forks source link

[SUPPORT] Flink uses BULK_INSERT method to write into Hudi to handle the error: File hdfs:/apps/hive/warehouse/hudi.db/xxx/.hoodie/20240727154013121.deltacommit.inflight does not exist! #11695

Open Toroidals opened 1 month ago

Toroidals commented 1 month ago

Tips before filing an issue

Describe the problem you faced

Flink uses BULK_INSERT method to write into Hudi to handle the error: File hdfs:/apps/hive/warehouse/hudi.db/xxx/.hoodie/20240727154013121.deltacommit.inflight does not exist!

public static HoodiePipeline.Builder getHoodieBuilder(HashMap<String, String> infoMap, HashMap<String, String> connectInfo) {

HoodiePipeline.Builder builder = HoodiePipeline.builder(infoMap.get("hudi_table_name"));
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.DATABASE_NAME.key(), infoMap.get("hudi_database_name"));
options.put(FlinkOptions.TABLE_NAME.key(), infoMap.get("hudi_table_name"));
options.put(FlinkOptions.PATH.key(), infoMap.get("hudi_hdfs_path"));

options.put("catalog.path", "hdfs:///apps/hudi/catalog/");

String hudiFieldMap = infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT);
ArrayList<ArrayList<String>> fieldList = JSON.parseObject(hudiFieldMap, new TypeReference<ArrayList<ArrayList<String>>>() {
});
for (ArrayList<String> columnList : fieldList) {
    builder.column("`" + columnList.get(0) + "` " + columnList.get(1));
}

String[] hudiPrimaryKeys = infoMap.get("hudi_primary_key").split(",");
builder.pk(hudiPrimaryKeys);

options.put(FlinkOptions.PRECOMBINE_FIELD.key(), infoMap.get("hudi_precombine_field"));
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name());
options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), infoMap.get("hudi_bucket_index_num_buckets"));
options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), infoMap.get("hudi_bucket_index_engine_type"));
options.put(FlinkOptions.COMPACTION_TASKS.key(), compactionTasks);
options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), infoMap.get("hudi_compaction_trigger_strategy"));
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), infoMap.get("hudi_compaction_delta_commits"));
options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), infoMap.get("hudi_compaction_delta_seconds"));
options.put(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), "true");

options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), infoMap.get("hudi_hive_sync_enabled"));
options.put(FlinkOptions.HIVE_SYNC_MODE.key(), infoMap.get("hudi_hive_sync_mode"));
options.put(FlinkOptions.HIVE_SYNC_DB.key(), infoMap.get("hudi_hive_sync_db"));
options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), infoMap.get("hudi_hive_sync_table"));
options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf");
options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), connectInfo.get("hive_metastore_url"));
options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), connectInfo.get("conn_url"));
options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");

options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), infoMap.get("hudi_hive_sync_partition_fields"));
options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), infoMap.get("hudi_hive_sync_partition_fields"));
options.put(FlinkOptions.WRITE_TASKS.key(), writeTasks);
options.put(FlinkOptions.OPERATION.key(), WriteOperationType.BULK_INSERT.value());

builder.options(options);
return builder;

}

ERROR info: 2024-07-27 20:28:15 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:101) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:322) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:574) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Map -> file_sorter -> bucket_bulk_insert: default_database.xxx_cdc' (operator a06944971a20d03f1b326b36e07c7ad2). at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:197) at org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142) at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [handle end input event for instant 20240727202142808] error ... 8 more Caused by: java.lang.IllegalArgumentException: File hdfs:/apps/hive/warehouse/hudi.db/hudi_xxx_cdc/.hoodie/20240727202142808.deltacommit.inflight does not exist! at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:42) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:617) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:598) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:223) at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:295) at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:239) at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:113) at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:76) at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:203) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doCommit(StreamWriteOperatorCoordinator.java:579) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.commitInstant(StreamWriteOperatorCoordinator.java:555) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.commitInstant(StreamWriteOperatorCoordinator.java:524) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.handleEndInputEvent(StreamWriteOperatorCoordinator.java:459) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$handleEventFromOperator$3(StreamWriteOperatorCoordinator.java:287) at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ... 5 more

To Reproduce

Steps to reproduce the behavior:

flink write.operation=BULK_INSERT index.type=BUCKET 2. 3. 4.

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

danny0405 commented 1 month ago

Caused by: java.lang.IllegalArgumentException: File hdfs:/apps/hive/warehouse/hudi.db/hudi_xxx_cdc/.hoodie/20240727202142808.deltacommit.inflight does not exist!

Do you have multi-writers there? It looks like the pending delta_commit has been rolled back.

ad1happy2go commented 1 month ago

@Toroidals Gentle ping here.