datavane / tis

Support agile DataOps Based on Flink, DataX and Flink-CDC, Chunjun with Web-UI
https://tis.pub
Apache License 2.0
1.04k stars 221 forks source link

K8S 环境中利用Powerjob启动执行器,打开datax执行器报错 #330

Closed baisui1981 closed 2 months ago

baisui1981 commented 5 months ago

默认直接用tis安装powerjob,但是好像tis和powerjob连接ClusterIP ,连不上。所以我等他把powerjob拉起后手动导入powerjob。能正常连上下发任务之类的。数据同步的时候powerjob一直未能成功,在tis改了下定时同步,弄着弄着datax就打不开了 50011719285710_ pic

641   /**
 642    * 取得K8S dataX worker
 643    *
 644    * @param context
 645    */
 646
 647   @Func(value = PermissionConstant.DATAX_MANAGE, sideEffect = false)
 648   public void doGetDataxWorkerMeta(Context context) {
 649     getJobWoker(context, TargetResName.K8S_DATAX_INSTANCE_NAME);
 650   }
 651
 652   @Func(value = PermissionConstant.DATAX_MANAGE, sideEffect = false)
 653   public void doGetJobWorkerMeta(Context context) {
 654     final TargetResName targetName = getK8SJobWorkerTargetName();
 655     getJobWoker(context, targetName);
 656   }
 657
 658   @Func(value = PermissionConstant.DATAX_MANAGE, sideEffect = false)
 659   public void doGetFlinkSession(Context context) {
 660     final TargetResName targetName = getK8SJobWorkerTargetName(false);
 661     Optional<ServerLaunchToken> launchToken
 662       = Optional.of(ServerLaunchToken.createFlinkClusterToken().token(FlinkClusterType.K8SSession, targetName));
 663     getJobWoker(context, targetName, launchToken);
 664   }
 665
 666   private void getJobWoker(Context context, TargetResName targetName) {
 667     getJobWoker(context, targetName, DataXJobWorker.getLaunchToken(targetName));
 668   }
 669
 670   private void getJobWoker(Context context, TargetResName targetName, Optional<ServerLaunchToken> launchToken) {
 671
 672
 673     DataXJobWorkerStatus jobWorkerStatus = new DataXJobWorkerStatus();
 674     if (!launchToken.isPresent()) {
 675       jobWorkerStatus.setState(IFlinkIncrJobStatus.State.NONE);
 676       this.setBizResult(context, jobWorkerStatus);
 677       return;
 678     }
 679     DataXJobWorker jobWorker = DataXJobWorker.getJobWorker(targetName, launchToken.map((t) -> t.getWorkerCptType()));
 680     boolean disableRcdeployment = this.getBoolean("disableRcdeployment");
 681     jobWorkerStatus.setState((jobWorker != null && jobWorker.inService()) ? IFlinkIncrJobStatus.State.RUNNING : IFlinkIncrJobStatus.State.NONE);
 682     if (jobWorkerStatus.getState() == IFlinkIncrJobStatus.State.RUNNING && !disableRcdeployment) {
 683       jobWorkerStatus.setPayloads(jobWorker.getPayloadInfo());
 684  >>>> 这里抛出异常了     jobWorkerStatus.setRcDeployments(jobWorker.getRCDeployments());
 685     }
 686     this.setBizResult(context, jobWorkerStatus);
 687   }