datavane / tis

Support agile DataOps Based on Flink, DataX and Flink-CDC, Chunjun with Web-UI
https://tis.pub
Apache License 2.0
1.02k stars 220 forks source link

支持批量任务分布式执行 #157

Closed baisui1981 closed 8 months ago

baisui1981 commented 2 years ago

97 has been referd

相关执行流程图

baisui1981 commented 1 year ago

powerJob相关

https://github.com/PowerJob/PowerJob

利用powerJob 实现DAG任务定义:https://github.com/PowerJob/PowerJob/blob/805046dccbb9dbeb6a59b0fe12a3fe9f1beb9ae7/powerjob-client/src/test/java/tech/powerjob/client/test/TestWorkflow.java

https://github.com/PowerJob/PowerJob/blob/805046dccbb9dbeb6a59b0fe12a3fe9f1beb9ae7/powerjob-client/src/test/java/tech/powerjob/client/test/TestWorkflow.java#L93-L103

        // DAG 图
        List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
        List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();

        nodes.add(new PEWorkflowDAG.Node(nodeList.get(0).getId()));
        nodes.add(new PEWorkflowDAG.Node(nodeList.get(1).getId()));
        nodes.add(new PEWorkflowDAG.Node(nodeList.get(2).getId()));

        edges.add(new PEWorkflowDAG.Edge(nodeList.get(0).getId(), nodeList.get(1).getId()));
        edges.add(new PEWorkflowDAG.Edge(nodeList.get(1).getId(), nodeList.get(2).getId()));
        PEWorkflowDAG peWorkflowDAG = new PEWorkflowDAG(nodes, edges);

需要修改点

  1. 现 DataXAction.launchDataXWorker() 启动 dataX 分布式执行需要的环境,入口:https://github.com/qlangtech/plugins/blob/2a36da841abb279ee0eec3054c1b3662e6bdccbf/tis-k8s-plugin/src/main/java/com/qlangtech/tis/plugin/datax/K8SDataXJobWorker.java
  2. dataX任务在创建的时加一段脚本,需要直接将powerjob的workflow创建出来(更新时也需要去更新一下)
  3. DataXAction.triggerFullbuildTask() 触发全量构建 需要加一段代码,判断workflow 是否存在?如果不存在需要让用户先填一个workflow的表单,再触发worflow任务执行