apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.03k stars 1.82k forks source link

[Bug] [SeaTunnel-flink/spark-starter-common] The full command to submit flink/spark job is not cross-platform compatible #6386

Open GangLiCN opened 8 months ago

GangLiCN commented 8 months ago

Search before asking

What happened

The full command to submit flink or spark job is not cross-platform compatible, for example, it can not run successfully on Windows.

Reproduce steps:

  1. Submit a spark job: java -Dlog4j2.configurationFile=g:\apache\apache-seatunnel-2.3.3\config\log4j2_spark.properties -Dseatunnel.logs.path=g:\apache\apache-seatunnel-2.3.3\logs -Dseatunnel.logs.file_name=seatunnel-spark-starter -cp g:\apache\apache-seatunnel-2.3.3/lib/* org.apache.seatunnel.core.starter.spark.SparkStarter --config config/v2.batch.config.template -m local -e client

or submit a flink job java -Dlog4j2.configurationFile=g:\apache/apache-seatunnel-2.3.3/config/log4j2_flink.properties -Dseatunnel.logs.path=g:\apache/apache-seatunnel-2.3.3/logs -Dseatunnel.logs.file_name=seatunnel-flink-fake-job -cp g:\apache/apache-seatunnel-2.3.3/lib/* org.apache.seatunnel.core.starter.flink.FlinkStarter --config ../config/fake.flink.job.yaml

  1. Above commands will be converted to below spark/flink specific job running commands: Flink: ${FLINK_HOME}/bin/flink run -c org.apache.seatunnel.core.starter.flink.SeaTunnelFlink G:\Apache\apache-seatunnel-2.3.3\starter\seatunnel-flink-15-starter.jar --config ../config/fake.flink.job.yaml --name SeaTunnel

Spark: ${SPARK_HOME}/bin/spark-submit --class "org.apache.seatunnel.core.starter.spark.SeaTunnelSpark" --name "SeaTunnel" --master "local" --deploy-mode "client" --jars "G:\Apache\apache-seatunnel-2.3.3\lib\connector-cdc-mysql-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\connector-clickhouse-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\connector-fake-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\connector-jdbc-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\connector-kafka-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\jcl-over-slf4j-1.7.25.jar,G:\Apache\apache-seatunnel-2.3.3\lib\log4j-api-2.17.1.jar,G:\Apache\apache-seatunnel-2.3.3\lib\log4j-core-2.17.1.jar,G:\Apache\apache-seatunnel-2.3.3\lib\log4j-slf4j-impl-2.17.1.jar,G:\Apache\apache-seatunnel-2.3.3\lib\mysql-connector-j-8.0.33.jar,G:\Apache\apache-seatunnel-2.3.3\lib\postgresql-42.2.16.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-api-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-flink-15-starter.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-hadoop3-3.1.4-uber-2.3.3-optional.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-spark-3-starter.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-starter.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-transforms-v2.jar,G:\Apache\apache-seatunnel-2.3.3\lib\slf4j-api-1.7.25.jar,G:\Apache\apache-seatunnel-2.3.3\connectors\seatunnel\connector-console-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\connectors\seatunnel\connector-fake-2.3.3.jar" --conf "job.mode=BATCH" --conf "execution.parallelism=2" --conf "checkpoint.interval=10000" G:\Apache\apache-seatunnel-2.3.3\starter\seatunnel-spark-3-starter.jar --config "config/v2.batch.config.template" --master "local" --deploy-mode "client" --name "SeaTunnel"

How to fix: The root cause of this issue is that the whole command string used to submit flink or spark job is incorrect. For example, in "SparkStarter.java"(line 196-221), the problematic line is: commands.add("${SPARK_HOME}/bin/spark-submit");

full conde snippet: protected List<String> buildFinal() { List<String> commands = new ArrayList<>(); commands.add("${SPARK_HOME}/bin/spark-submit"); appendOption(commands, "--class", SeaTunnelSpark.class.getName()); appendOption(commands, "--name", this.commandArgs.getJobName()); appendOption(commands, "--master", this.commandArgs.getMaster()); appendOption(commands, "--deploy-mode", this.commandArgs.getDeployMode().getDeployMode()); appendJars(commands, this.jars); appendFiles(commands, this.files); appendSparkConf(commands, this.sparkConf); appendAppJar(commands); appendOption(commands, "--config", this.commandArgs.getConfigFile()); appendOption(commands, "--master", this.commandArgs.getMaster()); appendOption(commands, "--deploy-mode", this.commandArgs.getDeployMode().getDeployMode()); appendOption(commands, "--name", this.commandArgs.getJobName()); if (commandArgs.isEncrypt()) { commands.add("--encrypt"); } if (commandArgs.isDecrypt()) { commands.add("--decrypt"); } if (this.commandArgs.isCheckConfig()) { commands.add("--check"); } return commands; }

Same issue exists in "FlinkStarter.java"(line 53 to 95), the problematic line is: command.add("${FLINK_HOME}/bin/flink");

Full code snippet: public List<String> buildCommands() { List<String> command = new ArrayList<>(); // set start command command.add("${FLINK_HOME}/bin/flink"); // set deploy mode, run or run-application command.add(flinkCommandArgs.getDeployMode().getDeployMode()); // set submitted target master if (flinkCommandArgs.getMasterType() != null) { command.add("--target"); command.add(flinkCommandArgs.getMasterType().getMaster()); } // set flink original parameters command.addAll(flinkCommandArgs.getOriginalParameters()); // set main class name command.add("-c"); command.add(APP_NAME); // set main jar name command.add(appJar); // set config file path command.add("--config"); command.add(flinkCommandArgs.getConfigFile()); // set check config flag if (flinkCommandArgs.isCheckConfig()) { command.add("--check"); } // set job name command.add("--name"); command.add(flinkCommandArgs.getJobName()); // set encryption if (flinkCommandArgs.isEncrypt()) { command.add("--encrypt"); } // set decryption if (flinkCommandArgs.isDecrypt()) { command.add("--decrypt"); } // set extra system properties flinkCommandArgs.getVariables().stream() .filter(Objects::nonNull) .map(String::trim) .forEach(variable -> command.add("-D" + variable)); return command; }

SeaTunnel Version

all released versions(e.g. 2.3.4, 2.3.3 or earlier versions)

SeaTunnel Config

env {
  # You can set SeaTunnel environment configuration here
  parallelism = 2
  job.mode = "BATCH"
  checkpoint.interval = 10000
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  FakeSource {
    parallelism = 2
    result_table_name = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }

  # If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
  # please go to https://seatunnel.apache.org/docs/category/source-v2
}

sink {
  Console {
  }

  # If you would like to get more information about how to configure SeaTunnel and see full list of sink plugins,
  # please go to https://seatunnel.apache.org/docs/category/sink-v2
}

Running Command

Flink:
${FLINK_HOME}/bin/flink run -c org.apache.seatunnel.core.starter.flink.SeaTunnelFlink G:\Apache\apache-seatunnel-2.3.3\starter\seatunnel-flink-15-starter.jar --config ../config/fake.flink.job.yaml --name SeaTunnel

Spark:
${SPARK_HOME}/bin/spark-submit --class "org.apache.seatunnel.core.starter.spark.SeaTunnelSpark" --name "SeaTunnel" --master "local" --deploy-mode "client" --jars "G:\Apache\apache-seatunnel-2.3.3\lib\connector-cdc-mysql-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\connector-clickhouse-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\connector-fake-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\connector-jdbc-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\connector-kafka-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\jcl-over-slf4j-1.7.25.jar,G:\Apache\apache-seatunnel-2.3.3\lib\log4j-api-2.17.1.jar,G:\Apache\apache-seatunnel-2.3.3\lib\log4j-core-2.17.1.jar,G:\Apache\apache-seatunnel-2.3.3\lib\log4j-slf4j-impl-2.17.1.jar,G:\Apache\apache-seatunnel-2.3.3\lib\mysql-connector-j-8.0.33.jar,G:\Apache\apache-seatunnel-2.3.3\lib\postgresql-42.2.16.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-api-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-flink-15-starter.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-hadoop3-3.1.4-uber-2.3.3-optional.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-spark-3-starter.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-starter.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-transforms-v2.jar,G:\Apache\apache-seatunnel-2.3.3\lib\slf4j-api-1.7.25.jar,G:\Apache\apache-seatunnel-2.3.3\connectors\seatunnel\connector-console-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\connectors\seatunnel\connector-fake-2.3.3.jar" --conf "job.mode=BATCH" --conf "execution.parallelism=2" --conf "checkpoint.interval=10000" G:\Apache\apache-seatunnel-2.3.3\starter\seatunnel-spark-3-starter.jar --config "config/v2.batch.config.template" --master "local" --deploy-mode "client" --name "SeaTunnel"

Error Exception

The above command can not run on Windows since ${FLINK_HOME} or ${SPARK_HOME} is not recognized on Windows, Instead, %FLINK_HOME%,%SPARK_HOME% is acceptable on windows.

Zeta or Flink or Spark Version

Spark: 3.X Flink: 1.15 or higher version

Java or Scala Version

java version "1.8.0_401" Java(TM) SE Runtime Environment (build 1.8.0_401-b10) Java HotSpot(TM) 64-Bit Server VM (build 25.401-b10, mixed mode)

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

GangLiCN commented 8 months ago

I've fixed this issue on my local windows environment and tested successfully. Please tell me how to pull code change since I'm fresh on doing this.

liunaijie commented 8 months ago

I've fixed this issue on my local windows environment and tested successfully. Please tell me how to pull code change since I'm fresh on doing this.

create a new branch base on the dev branch, commit your change. push to your github repo and create a pull request

github-actions[bot] commented 7 months ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.