apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.01k stars 1.82k forks source link

[Bug] [seatunnel-flink-starter-common] flink parameters parse error of job env #5888

Open mengyueyue opened 11 months ago

mengyueyue commented 11 months ago

Search before asking

What happened

i‘m run flink example on my idea,and i add a flink parameters on job env,and found a error,so debug code found if you parameter type is string,you parameter will become ”"XXX"“ that get quotes

SeaTunnel Version

sorry,i'm not use any release version of seatunnel , i'm use git commit is e26006d2aa47df0f92740cf9843dca862db82232, this is front of 2.3.3,but i‘m found lasted version has being this problem

SeaTunnel Config

env {
  # You can set SeaTunnel environment configuration here
  execution.parallelism = 2
  flink.execution.batch-shuffle-mode = ALL_EXCHANGES_PIPELINED
  job.mode = "BATCH"
  checkpoint.interval = 10000000000
  checkpoint.timeout = 99999999999
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
    Jdbc {
        url = ""
        driver = "org.postgresql.Driver"
        connection_check_timeout_sec = 100
        user = ""
        password = ""
        query = "SELECT UUID AS identifier,price,DATE ,postcode,TYPE AS property_type,is_new,duration,addr1 AS paon,addr2 AS saon,street,locality,town AS city,district,county,d AS category_type,e  FROM uk_price_paid3 where uuid in ('{0DCDB921-568A-42AF-A223-C878EAD9BA46}','{0DCDBE42-060E-4FF0-BBE0-46E96D53C176}')"
    }
}

sink {
  paimon {
    warehouse = ""
    database = ""
    table = ""
    s3.access-key = ""
    s3.secret-key = ""
    s3.endpoint = ""
    s3.path.style.access = true
  }
}

Running Command

/Library/Java/JavaVirtualMachines/jdk-11.0.13.jdk/Contents/Home/bin/java -Dfile.encoding=UTF-8 -classpath /Users/my/myself/hzgosun/seatunnel/seatunnel-examples/seatunnel-flink-connector-v2-example/target/classes:/Users/my/.m2/repository/org/scala-lang/scala-reflect/2.12.8/scala-reflect-2.12.8.jar:/Users/my/.m2/repository/org/scala-lang/scala-library/2.12.8/scala-library-2.12.8.jar:/Users/my/.m2/repository/com/typesafe/config/1.3.3/config-1.3.3.jar:/Users/my/myself/hzgosun/seatunnel/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/target/classes:/Users/my/myself/hzgosun/seatunnel/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/target/classes:/Users/my/myself/hzgosun/seatunnel/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/target/classes:/Users/my/myself/hzgosun/seatunnel/seatunnel-translation/seatunnel-translation-base/target/classes:/Users/my/myself/hzgosun/seatunnel/seatunnel-core/seatunnel-core-starter/target/classes:/Users/my/myself/hzgosun/seatunnel/seatunnel-api/target/classes:/Users/my/myself/hzgosun/seatunnel/seatunnel-common/target/classes:/Users/my/myself/hzgosun/seatunnel/seatunnel-config/seatunnel-config-shade/target/classes:/Users/my/myself/hzgosun/seatunnel/seatunnel-config/seatunnel-config-shade/target/seatunnel-config-shade-2.3.4-SNAPSHOT.jar:/Users/my/myself/hzgosun/seatunnel/seatunnel-config/seatunnel-config-base/target/classes:/Users/my/myself/hzgosun/seatunnel/seatunnel-config/seatunnel-config-base/target/seatunnel-config-base-2.3.4-SNAPSHOT.jar:/Users/my/.m2/repository/org/apache/commons/commons-collections4/4.4/commons-collections4-4.4.jar:/Users/my/myself/hzgosun/seatunnel/seatunnel-shade/seatunnel-guava/target/classes:/Users/my/myself/hzgosun/seatunnel/seatunnel-shade/seatunnel-guava/target/seatunnel-guava.jar:/Users/my/.m2/repository/commons-codec/commons-codec/1.13/commons-codec-1.13.jar:/Users/my/myself/hzgosun/seatunnel/seatunnel-shade/seatunnel-jackson/target/classes:/Users/my/myself/hzgosun/seatunnel/seatunnel-shade/seatunnel-jackson/target/seatunnel-jackson.jar:/Users/my/.m2/repository/com/fasterxml/jackson/dataformat/jackson-dataformat-properties/2.12.6/jackson-dataformat-properties-2.12.6.jar:/Users/my/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.12.6/jackson-databind-2.12.6.jar:/Users/my/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.12.6/jackson-annotations-2.12.6.jar:/Users/my/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.12.6/jackson-core-2.12.6.jar:/Users/my/myself/hzgosun/seatunnel/seatunnel-plugin-discovery/target/classes:/Users/my/.m2/repository/com/beust/jcommander/1.81/jcommander-1.81.jar:/Users/my/myself/hzgosun/seatunnel/seatunnel-transforms-v2/target/classes:/Users/my/.m2/repository/com/github/jsqlparser/jsqlparser/4.5/jsqlparser-4.5.jar:/Users/my/myself/hzgosun/seatunnel/seatunnel-connectors-v2/connector-fake/target/classes:/Users/my/myself/hzgosun/seatunnel/seatunnel-connectors-v2/connector-common/target/classes:/Users/my/myself/hzgosun/seatunnel/seatunnel-formats/seatunnel-format-json/target/classes:/Users/my/myself/hzgosun/seatunnel/seatunnel-connectors-v2/connector-console/target/classes:/Users/my/.m2/repository/org/apache/flink/flink-core/1.15.3/flink-core-1.15.3.jar:/Users/my/.m2/repository/org/apache/flink/flink-annotations/1.15.3/flink-annotations-1.15.3.jar:/Users/my/.m2/repository/org/apache/flink/flink-metrics-core/1.15.3/flink-metrics-core-1.15.3.jar:/Users/my/.m2/repository/org/apache/flink/flink-shaded-asm-9/9.2-15.0/flink-shaded-asm-9-9.2-15.0.jar:/Users/my/.m2/repository/org/apache/commons/commons-lang3/3.4/commons-lang3-3.4.jar:/Users/my/.m2/repository/com/esotericsoftware/kryo/kryo/2.24.0/kryo-2.24.0.jar:/Users/my/.m2/repository/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/Users/my/.m2/repository/org/objenesis/objenesis/2.1/objenesis-2.1.jar:/Users/my/.m2/repository/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar:/Users/my/.m2/repository/org/apache/commons/commons-compress/1.20/commons-compress-1.20.jar:/Users/my/.m2/repository/org/apache/flink/flink-shaded-guava/30.1.1-jre-15.0/flink-shaded-guava-30.1.1-jre-15.0.jar:/Users/my/.m2/repository/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/Users/my/.m2/repository/org/apache/flink/flink-shaded-force-shading/15.0/flink-shaded-force-shading-15.0.jar:/Users/my/.m2/repository/org/apache/flink/flink-clients/1.15.3/flink-clients-1.15.3.jar:/Users/my/.m2/repository/org/apache/flink/flink-optimizer/1.15.3/flink-optimizer-1.15.3.jar:/Users/my/.m2/repository/commons-cli/commons-cli/1.5.0/commons-cli-1.5.0.jar:/Users/my/.m2/repository/org/apache/flink/flink-streaming-java/1.15.3/flink-streaming-java-1.15.3.jar:/Users/my/.m2/repository/org/apache/flink/flink-file-sink-common/1.15.3/flink-file-sink-common-1.15.3.jar:/Users/my/.m2/repository/org/apache/flink/flink-java/1.15.3/flink-java-1.15.3.jar:/Users/my/.m2/repository/org/apache/commons/commons-math3/3.6.1/commons-math3-3.6.1.jar:/Users/my/.m2/repository/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar:/Users/my/.m2/repository/org/apache/flink/flink-table-common/1.15.3/flink-table-common-1.15.3.jar:/Users/my/.m2/repository/com/ibm/icu/icu4j/67.1/icu4j-67.1.jar:/Users/my/.m2/repository/org/apache/flink/flink-runtime/1.15.3/flink-runtime-1.15.3.jar:/Users/my/.m2/repository/org/apache/flink/flink-rpc-core/1.15.3/flink-rpc-core-1.15.3.jar:/Users/my/.m2/repository/org/apache/flink/flink-rpc-akka-loader/1.15.3/flink-rpc-akka-loader-1.15.3.jar:/Users/my/.m2/repository/org/apache/flink/flink-queryable-state-client-java/1.15.3/flink-queryable-state-client-java-1.15.3.jar:/Users/my/.m2/repository/org/apache/flink/flink-hadoop-fs/1.15.3/flink-hadoop-fs-1.15.3.jar:/Users/my/.m2/repository/commons-io/commons-io/2.11.0/commons-io-2.11.0.jar:/Users/my/.m2/repository/org/apache/flink/flink-shaded-netty/4.1.70.Final-15.0/flink-shaded-netty-4.1.70.Final-15.0.jar:/Users/my/.m2/repository/org/apache/flink/flink-shaded-jackson/2.12.4-15.0/flink-shaded-jackson-2.12.4-15.0.jar:/Users/my/.m2/repository/org/apache/flink/flink-shaded-zookeeper-3/3.5.9-15.0/flink-shaded-zookeeper-3-3.5.9-15.0.jar:/Users/my/.m2/repository/org/javassist/javassist/3.24.0-GA/javassist-3.24.0-GA.jar:/Users/my/.m2/repository/org/xerial/snappy/snappy-java/1.1.8.3/snappy-java-1.1.8.3.jar:/Users/my/.m2/repository/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar:/Users/my/.m2/repository/org/apache/flink/flink-table-planner_2.12/1.15.3/flink-table-planner_2.12-1.15.3.jar:/Users/my/.m2/repository/org/codehaus/janino/commons-compiler/3.0.11/commons-compiler-3.0.11.jar:/Users/my/.m2/repository/org/codehaus/janino/janino/3.0.11/janino-3.0.11.jar:/Users/my/.m2/repository/org/apache/flink/flink-scala_2.12/1.15.3/flink-scala_2.12-1.15.3.jar:/Users/my/.m2/repository/org/scala-lang/scala-reflect/2.12.7/scala-reflect-2.12.7.jar:/Users/my/.m2/repository/org/scala-lang/scala-compiler/2.12.7/scala-compiler-2.12.7.jar:/Users/my/.m2/repository/org/scala-lang/modules/scala-xml_2.12/1.0.6/scala-xml_2.12-1.0.6.jar:/Users/my/.m2/repository/com/twitter/chill_2.12/0.7.6/chill_2.12-0.7.6.jar:/Users/my/.m2/repository/org/apache/flink/flink-table-runtime/1.15.3/flink-table-runtime-1.15.3.jar:/Users/my/.m2/repository/org/apache/flink/flink-cep/1.15.3/flink-cep-1.15.3.jar:/Users/my/.m2/repository/org/apache/flink/flink-table-api-java-bridge/1.15.3/flink-table-api-java-bridge-1.15.3.jar:/Users/my/.m2/repository/org/apache/flink/flink-table-api-bridge-base/1.15.3/flink-table-api-bridge-base-1.15.3.jar:/Users/my/.m2/repository/org/apache/flink/flink-table-api-java/1.15.3/flink-table-api-java-1.15.3.jar:/Users/my/.m2/repository/org/apache/flink/flink-runtime-web/1.15.3/flink-runtime-web-1.15.3.jar:/Users/my/.m2/repository/com/squareup/okhttp3/mockwebserver/3.6.0/mockwebserver-3.6.0.jar:/Users/my/.m2/repository/com/squareup/okhttp3/okhttp/3.6.0/okhttp-3.6.0.jar:/Users/my/.m2/repository/com/squareup/okio/okio/1.11.0/okio-1.11.0.jar:/Users/my/.m2/repository/org/bouncycastle/bcprov-jdk15on/1.50/bcprov-jdk15on-1.50.jar:/Users/my/.m2/repository/junit/junit/4.13.2/junit-4.13.2.jar:/Users/my/.m2/repository/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar:/Users/my/.m2/repository/com/google/guava/guava/27.0-jre/guava-27.0-jre.jar:/Users/my/.m2/repository/com/google/guava/failureaccess/1.0/failureaccess-1.0.jar:/Users/my/.m2/repository/com/google/guava/listenablefuture/9999.0-empty-to-avoid-conflict-with-guava/listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar:/Users/my/.m2/repository/org/checkerframework/checker-qual/3.10.0/checker-qual-3.10.0.jar:/Users/my/.m2/repository/com/google/errorprone/error_prone_annotations/2.2.0/error_prone_annotations-2.2.0.jar:/Users/my/.m2/repository/com/google/j2objc/j2objc-annotations/1.1/j2objc-annotations-1.1.jar:/Users/my/.m2/repository/org/codehaus/mojo/animal-sniffer-annotations/1.17/animal-sniffer-annotations-1.17.jar:/Users/my/.m2/repository/org/slf4j/slf4j-api/1.7.25/slf4j-api-1.7.25.jar:/Users/my/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.17.1/log4j-slf4j-impl-2.17.1.jar:/Users/my/.m2/repository/org/apache/logging/log4j/log4j-api/2.17.1/log4j-api-2.17.1.jar:/Users/my/.m2/repository/org/apache/logging/log4j/log4j-core/2.17.1/log4j-core-2.17.1.jar:/Users/my/.m2/repository/org/slf4j/jcl-over-slf4j/1.7.25/jcl-over-slf4j-1.7.25.jar:/Users/my/.m2/repository/org/apache/logging/log4j/log4j-1.2-api/2.17.1/log4j-1.2-api-2.17.1.jar:/Users/my/myself/hzgosun/seatunnel/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar:/Users/my/.m2/repository/org/apache/paimon/paimon-bundle/0.4.0-incubating/paimon-bundle-0.4.0-incubating.jar:/Users/my/.m2/repository/org/apache/paimon/paimon-s3/0.4.0-incubating/paimon-s3-0.4.0-incubating.jar:/Users/my/.m2/repository/org/postgresql/postgresql/42.4.3/postgresql-42.4.3.jar:/Users/my/myself/hzgosun/seatunnel/seatunnel-connectors-v2/connector-jdbc/target/classes:/Users/my/myself/hzgosun/seatunnel/seatunnel-connectors-v2/connector-paimon/target/classes org.apache.seatunnel.example.flink.v2.SeaTunnelApiExample

Error Exception

Exception in thread "main" java.lang.IllegalArgumentException: Could not parse value '"ALL_EXCHANGES_PIPELINED"' for key 'execution.batch-shuffle-mode'.
    at org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:996)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.<init>(StreamExecutionEnvironment.java:277)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.<init>(StreamExecutionEnvironment.java:246)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.<init>(StreamExecutionEnvironment.java:234)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.<init>(LocalStreamEnvironment.java:51)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.java:2320)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$15(StreamExecutionEnvironment.java:2262)
    at java.base/java.util.Optional.orElseGet(Optional.java:369)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2262)
    at org.apache.seatunnel.core.starter.flink.execution.FlinkRuntimeEnvironment.createStreamEnvironment(FlinkRuntimeEnvironment.java:189)
    at org.apache.seatunnel.core.starter.flink.execution.FlinkRuntimeEnvironment.prepare(FlinkRuntimeEnvironment.java:94)
    at org.apache.seatunnel.core.starter.flink.execution.FlinkRuntimeEnvironment.prepare(FlinkRuntimeEnvironment.java:58)
    at org.apache.seatunnel.core.starter.execution.RuntimeEnvironment.initialize(RuntimeEnvironment.java:48)
    at org.apache.seatunnel.core.starter.flink.execution.FlinkRuntimeEnvironment.<init>(FlinkRuntimeEnvironment.java:73)
    at org.apache.seatunnel.core.starter.flink.execution.FlinkRuntimeEnvironment.getInstance(FlinkRuntimeEnvironment.java:341)
    at org.apache.seatunnel.core.starter.flink.execution.FlinkExecution.<init>(FlinkExecution.java:96)
    at org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:59)
    at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
    at org.apache.seatunnel.example.flink.v2.SeaTunnelApiExample.main(SeaTunnelApiExample.java:43)
Caused by: java.lang.IllegalArgumentException: Could not parse value for enum class org.apache.flink.api.common.BatchShuffleMode. Expected one of: [[ALL_EXCHANGES_PIPELINED, ALL_EXCHANGES_BLOCKING]]
    at org.apache.flink.configuration.ConfigurationUtils.lambda$convertToEnum$10(ConfigurationUtils.java:389)
    at java.base/java.util.Optional.orElseThrow(Optional.java:408)
    at org.apache.flink.configuration.ConfigurationUtils.convertToEnum(ConfigurationUtils.java:386)
    at org.apache.flink.configuration.ConfigurationUtils.convertValue(ConfigurationUtils.java:329)
    at org.apache.flink.configuration.Configuration.lambda$getOptional$3(Configuration.java:716)
    at java.base/java.util.Optional.map(Optional.java:265)
    at org.apache.flink.configuration.Configuration.getOptional(Configuration.java:716)
    ... 19 more

Zeta or Flink or Spark Version

flink.1.15.3

Java or Scala Version

java11

Screenshots

image

Are you willing to submit PR?

Code of Conduct

mengyueyue commented 11 months ago

if we modify this code to

if (confKey.startsWith(prefixConf)) { configuration.setString( confKey.replaceFirst(prefixConf, ""), String.valueOf(entryConfKey.getValue().unwrapped())); }

this is result that is correct

image

Carl-Zhou-CN commented 11 months ago

hi, @zhilinli123 Could you please help confirm this problem?

github-actions[bot] commented 9 months ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.