apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.46k stars 2.43k forks source link

[SUPPORT]Fail to execute offline flink compactor in service mode. #7546

Open Leoyzen opened 1 year ago

Leoyzen commented 1 year ago

Tips before filing an issue

Describe the problem you faced

I'm using HoodieFlinkCompactor to do offline compaction job. And it failed to using service mode.

The failure is Cannot have more than one execute() or executeAsync() call in a single environment.

Runs fine in single round with service mode disabled(run once and quit).

To Reproduce

Steps to reproduce the behavior:

  1. start a standalone flink compactor job
  2. enabling service mode
  3. the job fails when "the parallism" jobs done(the next loop)
  4. the job restart

Expected behavior

the second loop(which more than the first "parallism" jobs done) success when using service mode.

Environment Description

0.12.1

Additional context

job config

--path oss://dengine-lake-zjk/cloudcode_prod/dwd_egc_adv_req_outra
--compaction-max-memory 2048
--seq LIFO
--compaction-tasks 16
--plan-select-strategy all
--min-compaction-interval-seconds 30
--service

Stacktrace

2022-12-23 23:14:05,976 [pool-17-thread-1] INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class class org.apache.hudi.common.model.CompactionOperation cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2022-12-23 23:14:05,983 [pool-17-thread-1] WARN  org.apache.flink.resourceplan.applyagent.StreamGraphModifier [] - Path of resource plan is not specified, do nothing.
2022-12-23 23:14:05,983 [pool-17-thread-1] ERROR org.apache.hudi.client.RunsTableService                      [] - Shutting down compaction service due to exception
org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment.
    at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:199) ~[flink-dist-1.15-vvr-6.0.2-3-SNAPSHOT.jar:1.15-vvr-6.0.2-3-SNAPSHOT]
    at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:187) ~[flink-dist-1.15-vvr-6.0.2-3-SNAPSHOT.jar:1.15-vvr-6.0.2-3-SNAPSHOT]
    at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:119) ~[flink-dist-1.15-vvr-6.0.2-3-SNAPSHOT.jar:1.15-vvr-6.0.2-3-SNAPSHOT]
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1985) ~[flink-dist-1.15-vvr-6.0.2-3-SNAPSHOT.jar:1.15-vvr-6.0.2-3-SNAPSHOT]
    at org.apache.hudi.sink.compact.HoodieFlinkCompactor$AsyncCompactionService.compact(HoodieFlinkCompactor.java:322) ~[flink-hudi-bundle-1.3-SNAPSHOT-jar-with-dependencies-20221217104900.jar:?]
    at org.apache.hudi.sink.compact.HoodieFlinkCompactor$AsyncCompactionService.lambda$startService$0(HoodieFlinkCompactor.java:204) ~[flink-hudi-bundle-1.3-SNAPSHOT-jar-with-dependencies-20221217104900.jar:?]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) [?:1.8.0_102]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) [?:1.8.0_102]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) [?:1.8.0_102]
    at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102]
yihua commented 1 year ago

@danny0405 I think Hudi supports the Flink compactor in service mode. Is there any config to get it right?

Leoyzen commented 1 year ago

@danny0405 @yihua Is there any solution right now? We have a source which produce 100k tps+ and taskmanager keeps crash timeout when using online compaction.

danny0405 commented 1 year ago

@Leoyzen Thanks for the feedback, it seems that Flink does not allow multiple jobs in one execution env in some modes(high availability HA or web submission), so I fired a fix here: https://github.com/apache/hudi/pull/7588.

I also notice that you use the Flink jar provided by Aliyun EMR maybe, it is not that easy for version upgrade.

Leoyzen commented 1 year ago

@danny0405 Thanks for the fix, I will give it a try. Yeah I'm using aliyun vvp/vvr, but the bundle is compiled by myself and manually provided as individual jar.So it will not depend on the enviroment.

danny0405 commented 1 year ago

7588 has been merged, feel free to reopen this if the problem still exists.

Leoyzen commented 1 year ago

@danny0405 It seems not commit compaction after compaction finished in service mode.A lot of rollback have been found. logs.zip

danny0405 commented 1 year ago

Do you compact multiple instants together in one job ? And whether there is some error stack trace here ?

danny0405 commented 1 year ago

image The current strategy is that the compaction job would try to rollback any inflight compaction instants that have not been finished. (Because we have no idea whether the inflight compaction hangs/crashes or is ongoing).

So you should ensure that the compaction job can finish during the schedule time interval.

Leoyzen commented 1 year ago

@danny0405 I know.There is only one job doing offline compaction, and this job contains multiple slot/parallism to do compaction.

You can see there is no commit after compaction finished, it is abnormal comparing with service mode disabled.

danny0405 commented 1 year ago

In service mode, a new compaction job is scheduled for the schedule interval (by default 10 minutes), that means all the compaction tasks should finish in 10 minutes, is that the case for you ?

Leoyzen commented 1 year ago

@danny0405 Here is the situation I've meet:

  1. start a job with service mode enabled. It's quite a large job (200+ filegroup with 1GB+ each, 100+ compaction tasks).
  2. the first round(load all instants) finished, and the second round(newly added compaction task) start to rollback the tasks which just has been done within first round.
  3. looking into the log, I've found there is no committing after each compaction task.So when enter second round, the task has to rollback all the task just been done and do it again(although the file has been created, but with no instant.commit file).
  4. the dirty files keeps second round compaction failing(the final parquet file already exists), I have to replace CREATE with OVERWRITE within the code to avoid failure.
danny0405 commented 1 year ago

Okey, you need to confirm 2 things:

  1. The offline compaction supports to compact multiple instants at the same time, you can disable it to not let the compaction for one job takes too long
  2. Dig into the CompactionCommit sink to see why the compaction is not committed, the compaction instant is committed when all the file groups belong to it has finish the compaction task, you can see some inputs/outputs metrics on the job DAG metrics, and there are some log for CompactionCommit operator.
Leoyzen commented 1 year ago

@danny0405 Here is the case. The command line to startup the offline program(service mode):

--path oss://dengine-lake-zjk/cloudcode_prod/dwd_egc_adv_resp_intra 
--compaction-max-memory 3072 
--archive-min-commits 180 
--archive-max-commits 2016 
--seq LIFO 
--compaction-tasks 16 
--plan-select-strategy num_instants 
--max-num-plans 16 
--min-compaction-interval-seconds 30 
--spillable_map_path /opt/flink/flink-tmp-dir/ 
--service 
-Dhadoop.fs.AbstractFileSystem.oss.impl=com.aliyun.jindodata.oss.OSS 
-Dhadoop.fs.oss.impl=com.aliyun.jindodata.oss.JindoOssFileSystem 
-Dhadoop.fs.oss.endpoint=cn-zhangjiakou.oss.aliyuncs.com 
-Dhadoop.fs.oss.credentials.provider=com.aliyun.jindodata.oss.auth.SimpleCredentialsProvider 
-Dhadoop.fs.oss.accessKeyId=******** 
-Dhadoop.fs.oss.accessKeySecret=***********
图片

The job runs for first round(16 instants for 120 files).

And then TM stucks here while JM still rollback the compaction again and again, it don't commit finished at all.

The issue should be reopened. tm-compaction.log jm-compaction.log

danny0405 commented 1 year ago

Yeah, I didn't see any keywords for compaction commit like this Committing Compaction, I see that you allow the param --compaction-tasks 16 to allow concurrent compaction for 16 plans, did you try to compile one by one instead?

If there is any exception like the Connection Refused exception, it may be related with this patch: https://github.com/apache/hudi/pull/7214

Leoyzen commented 1 year ago

@danny0405 I've noticed that there is warning and recovered log and don't know if it related.

I don't see these log while service mode is disabled.

2023-02-02 22:05:18,868 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class org.apache.hudi.common.model.CompactionOperation does not contain a setter for field baseInstantTime
2023-02-02 22:05:18,869 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class class org.apache.hudi.common.model.CompactionOperation cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2023-02-02 22:05:18,884 WARN  org.apache.flink.resourceplan.applyagent.StreamGraphModifier [] - Path of resource plan is not specified, do nothing.
2023-02-02 22:05:18,884 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 6e03ee3092954b338d7b984d6918ce32 was recovered successfully.
Leoyzen commented 1 year ago

@danny0405 Maybe I know what's going on.

It is not work at all in HA standalone cluster like Aliyun VVP.

The compact() doesn't quit after prev patch, it just do nothing after the task has finished. The job don't get "done" when service mode disabled. It hangs there and doing nothing.

The job don't get second around when service mode enabled. Hanging after first round.So the timeline service rollback again and again.

The compacation is succeed There is a commit file under .hoodie directory after first around. Although there is no logs at all.

So maybe it's not working when using new StreamEnviroment to execute the job.

Leoyzen commented 1 year ago

I think it's better to move the compaction plan generate code inside the source function.

I can make a pr if needed.

danny0405 commented 1 year ago

What we suggest is to generate the compaction plan inside the streaming ingestion job, the offline compaction then just executes these plans, do you mean your job is stuck at the plan generation phase?

Leoyzen commented 1 year ago

@danny0405 No. I mean the job stuck after plan execution and can't enter the second around.

In our scenario, the compaction plan is generated by streaming ingesting job. And the async table service in job manager seems to be problemlic. I suggest the code of plan discovery also the service mode cycle could be moved into source function.

danny0405 commented 1 year ago

Yeah, that's a solution, we can move the plan execution distribution into a long running streaming source operator instead when service mode is enabled.