apache / incubator-streampark

Make stream processing easier! Easy-to-use streaming application development framework and operation platform.
https://streampark.apache.org/
Apache License 2.0
3.91k stars 1.01k forks source link

[improve] Support third parties to quickly initiate flink tasks #2678

Open zhaoyang20201221 opened 1 year ago

zhaoyang20201221 commented 1 year ago

Search before asking

Description

In the process of data synchronization, I want to use the streampark service to quickly start a flink task. At this time, I found that the following interfaces need to be initiated every time

Modify the previous interface display: stream-优化

The interface display after the change: stream-优化2

Usage Scenario

When a third-party service uses the streamPark service, it hopes to quickly start a flink task, and does not care how the internal logic of streamPark is implemented. If the above interfaces are combined to provide an interface service, streamPark will have better scalability and applicability.

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

wolfboys commented 1 year ago

Great proposal, This feature is very useful for third-party platform integration, Are there any more details about design and implementation?

zhaoyang20201221 commented 1 year ago

This is a piece of pseudocode, briefly describe my implementation logic

/**
 * Quickly start a flink task
 * @param copyId  The ID of the copy task
 * @param jobName Task name
 * @param args    parameter
 * @param isDel   Whether you want to delete duplicate tasks, default value = false
 *
 * @return
 */

public Boolean quickStartFlinkTask(Long copyId ,String jobName ,String args,Boolean isDel){

  try {

      // Verify that the task name already exists
      boolean existsByJobName = applicationService.existsByJobName(jobName);

      if(existsByJobName && isDel) {
          // Get the task details through the job name
          Application app =  applicationService.getByJobName(jobName);
          // Delete the previous task
          applicationService.delete(app);
      }

      // First copy a configuration template and get the task id
      Long id = applicationService.copy(copyId,jobName,args);

      // Get task details
      Application application = applicationService.getById(id);

      // build flink task
      appBuildPipeService.buildApplication(id,false);

      // start flink task
      applicationService.start(application);
  } catch (Exception e) {
      throw new RuntimeException(e.toString());
  }

}