DTStack / chunjun

A data integration framework
https://dtstack.github.io/chunjun/
Apache License 2.0
4.01k stars 1.69k forks source link

自定义flink jobId,如果传了jobId 就用指定的jobId,如果没传就使用flink自动生成的jobId #758

Open shaolei7788 opened 2 years ago

shaolei7788 commented 2 years ago

自定义flink jobId,如果传了jobId 就用指定的jobId,如果没传就使用flink自动生成的jobId

shaolei7788 commented 2 years ago

首先在Options类里加入jobId属性,然后在JobGraphUtil类的buildJobGraph方法修改调用 createJobGraph的方式

JobGraph jobGraph = null; if (StringUtils.isNullOrWhitespaceOnly(launcherOptions.getJobId())){ // use the jobId generated by flink when jobId is empty jobGraph = PackagedProgramUtils.createJobGraph( program, launcherOptions.loadFlinkConfiguration(), flinkConf.getInteger(DEFAULT_PARALLELISM), false); }else{ // use the customize jobId generated by flink when jobId is not empty jobGraph = PackagedProgramUtils.createJobGraph( program, launcherOptions.loadFlinkConfiguration(), flinkConf.getInteger(DEFAULT_PARALLELISM), new JobID(StringUtils.hexStringToByte(launcherOptions.getJobId())), false); }