apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.07k stars 1.83k forks source link

[Bug] [seatunnel-flink-starter] -i params parameter problem,cannot use variable #4453

Open LYL41011 opened 1 year ago

LYL41011 commented 1 year ago

Search before asking

What happened

Documentation states: Use -i or --variable to specify the variables in the configuration file, you can configure multiple But when I use -i , like ./start-seatunnel-flink-connector-v2.sh -i ckhouse="10.185.71.36" the program throws an error I try three methods , all are useless: host = ""${ckhouse}":8123" host = "${ckhouse}:8123" host = "'${ckhouse}':8123"

SeaTunnel Version

2.3.0 release

SeaTunnel Config

env {
  execution.parallelism = 1
  job.mode = "BATCH"
}

source {

   Clickhouse {
    host = ""${ckhouse}":8123"
    database = "fin_dw_local"

Running Command

./start-seatunnel-flink-connector-v2.sh -i ckhouse="10.185.71.36" -c ../config/task/flink.common.bianliang.conf

Error Exception

Execute SeaTunnel Flink Job: ${FLINK_HOME}/bin/flink run -c org.apache.seatunnel.core.starter.flink.SeatunnelFlink /home/q/dis/apache-seatunnel-incubating-2.3.0/starter/seatunnel-flink-starter.jar --config ../config/task/flink.common.bianliang.conf -Dpipeline.name=SeaTunnel -Dckhouse=10.185.71.36 -Duserindex=0 -Ddatahour=2023-03-30
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
2023-03-30 14:53:39,906 INFO  org.apache.hadoop.security.UserGroupInformation              [] - Login successful for user dis@LYCC.COM using keytab file /home/q/dis/dis.keytab

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: need to Config#resolve(), see the API docs for Config#resolve(); substitution not resolved: ConfigConcatenation(""${ckhouse}":8123")
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.seatunnel.shade.com.typesafe.config.ConfigException$NotResolved: need to Config#resolve(), see the API docs for Config#resolve(); substitution not resolved: ConfigConcatenation(""${ckhouse}":8123")
        at org.apache.seatunnel.shade.com.typesafe.config.impl.ConfigConcatenation.notResolved(ConfigConcatenation.java:51)
        at org.apache.seatunnel.shade.com.typesafe.config.impl.ConfigConcatenation.valueType(ConfigConcatenation.java:58)
        at org.apache.seatunnel.shade.com.typesafe.config.impl.SimpleConfig.hasPath(SimpleConfig.java:100)
        at org.apache.seatunnel.common.config.CheckConfigUtil.isValidParam(CheckConfigUtil.java:80)
        at org.apache.seatunnel.common.config.CheckConfigUtil.lambda$checkAllExists$0(CheckConfigUtil.java:42)
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
        at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists(CheckConfigUtil.java:43)
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSource.prepare(ClickhouseSource.java:69)
        at org.apache.seatunnel.core.starter.flink.execution.SourceExecuteProcessor.initializePlugins(SourceExecuteProcessor.java:115)
        at org.apache.seatunnel.core.starter.flink.execution.AbstractPluginExecuteProcessor.<init>(AbstractPluginExecuteProcessor.java:53)
        at org.apache.seatunnel.core.starter.flink.execution.SourceExecuteProcessor.<init>(SourceExecuteProcessor.java:56)
        at org.apache.seatunnel.core.starter.flink.execution.FlinkExecution.<init>(FlinkExecution.java:77)
        at org.apache.seatunnel.core.starter.flink.command.FlinkApiTaskExecuteCommand.execute(FlinkApiTaskExecuteCommand.java:53)
        at org.apache.seatunnel.core.starter.Seatunnel.run(Seatunnel.java:39)
        at org.apache.seatunnel.core.starter.flink.SeatunnelFlink.main(SeatunnelFlink.java:34)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        ... 11 more

Flink or Spark Version

flink 1.13.6release

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

archibald-nice commented 1 year ago

I have the same problem. Has anyone solved it?

xywendz commented 1 year ago

any update?

davidzollo commented 1 year ago

seatunnel-flink not support your usage, you can use shell to replace the variable for now. BTW, the coming version 2.3.3 for seatunnel zeta will support your usage.