apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.82k stars 1.76k forks source link

[Bug] org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonParseException: Unexpected character ('$' (code 36)): was expecting comma to separate Object entries #6370

Closed czlh closed 5 months ago

czlh commented 6 months ago

Search before asking

What happened

配置里面用了变量,报错

SeaTunnel Version

V2.3.3

SeaTunnel Config

env {
  execution.parallelism = 1
  job.mode = "BATCH"
  job.name = "sync_orig_mqtt_doris_to_hive"
}

source {
  "Jdbc":{
            "result_table_name": "orig_mqtt_iot_device_property_post",
            "url":"jdbc:mysql://192.168.9.3:9030/suwen_platform?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowMultiQueries=true&rewriteBatchedStatements=true",
            "driver":"com.mysql.cj.jdbc.Driver",
            "user":"root",
            "password":"",
            "query":"select * from orig_mqtt_iot_device_property_post where ts>'"${startTs}"'"
        }
}

transform { 
}

sink {
    Console {
        source_table_name = "orig_mqtt_iot_device_property_post"
    }

}

Running Command

bin/start-seatunnel-flink-15-connector-v2.sh  --config config/sync_orig_mqtt_doris_to_hive.conf -i startTs=2024-02-21

Error Exception

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: String json deserialization exception.{"env":{"execution.parallelism":1,"job.mode":"BATCH","job.name":"sync_orig_mqtt_doris_to_hive"},"source":[{"password":"","driver":"com.mysql.cj.jdbc.Driver","query":"select * from orig_mqtt_iot_device_property_post where ts>'"${startTs}"'","result_table_name":"orig_mqtt_iot_device_property_post","plugin_name":"Jdbc","user":"root","url":"jdbc:mysql://192.168.9.3:9030/suwen_platform?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowMultiQueries=true&rewriteBatchedStatements=true"}],"transform":[],"sink":[{"source_table_name":"orig_mqtt_iot_device_property_post","metastore_uri":"thrift://192.168.9.3:9083","plugin_name":"Hive","table_name":"saas.orig_mqtt_iot_device_property_post"}]}
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.lang.RuntimeException: String json deserialization exception.{"env":{"execution.parallelism":1,"job.mode":"BATCH","job.name":"sync_orig_mqtt_doris_to_hive"},"source":[{"password":"","driver":"com.mysql.cj.jdbc.Driver","query":"select * from orig_mqtt_iot_device_property_post where ts>'"${startTs}"'","result_table_name":"orig_mqtt_iot_device_property_post","plugin_name":"Jdbc","user":"root","url":"jdbc:mysql://192.168.9.3:9030/suwen_platform?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowMultiQueries=true&rewriteBatchedStatements=true"}],"transform":[],"sink":[{"source_table_name":"orig_mqtt_iot_device_property_post","metastore_uri":"thrift://192.168.9.3:9083","plugin_name":"Hive","table_name":"saas.orig_mqtt_iot_device_property_post"}]}
        at org.apache.seatunnel.common.utils.JsonUtils.parseObject(JsonUtils.java:243)
        at org.apache.seatunnel.common.utils.JsonUtils.parseObject(JsonUtils.java:236)
        at org.apache.seatunnel.core.starter.utils.ConfigShadeUtils.processConfig(ConfigShadeUtils.java:140)
        at org.apache.seatunnel.core.starter.utils.ConfigShadeUtils.decryptConfig(ConfigShadeUtils.java:119)
        at org.apache.seatunnel.core.starter.utils.ConfigShadeUtils.decryptConfig(ConfigShadeUtils.java:104)
        at org.apache.seatunnel.core.starter.utils.ConfigBuilder.ofInner(ConfigBuilder.java:53)
        at org.apache.seatunnel.core.starter.utils.ConfigBuilder.lambda$of$1(ConfigBuilder.java:67)
        at java.util.Optional.orElseGet(Optional.java:267)
        at org.apache.seatunnel.core.starter.utils.ConfigBuilder.of(ConfigBuilder.java:67)
        at org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:51)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.flink.SeaTunnelFlink.main(SeaTunnelFlink.java:34)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        ... 12 more
Caused by: org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonParseException: Unexpected character ('$' (code 36)): was expecting comma to separate Object entries
 at [Source: (byte[])"{"env":{"execution.parallelism":1,"job.mode":"BATCH","job.name":"sync_orig_mqtt_doris_to_hive"},"source":[{"password":"","driver":"com.mysql.cj.jdbc.Driver","query":"select * from orig_mqtt_iot_device_property_post where ts>'"${startTs}"'","result_table_name":"orig_mqtt_iot_device_property_post","plugin_name":"Jdbc","user":"root","url":"jdbc:mysql://192.168.9.3:9030/suwen_platform?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCat"[truncated 281 bytes]; line: 1, column: 228]
        at org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
        at org.apache.seatunnel.shade.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
        at org.apache.seatunnel.shade.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:635)
        at org.apache.seatunnel.shade.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1024)
        at org.apache.seatunnel.shade.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:269)
        at org.apache.seatunnel.shade.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeArray(JsonNodeDeserializer.java:469)
        at org.apache.seatunnel.shade.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:280)
        at org.apache.seatunnel.shade.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69)
        at org.apache.seatunnel.shade.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16)
        at org.apache.seatunnel.shade.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
        at org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635)
        at org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
        at org.apache.seatunnel.common.utils.JsonUtils.parseObject(JsonUtils.java:241)
        ... 28 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

1.18

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

liunaijie commented 6 months ago

the variable replacement only available on zeta engine

czlh commented 6 months ago

the variable replacement only available on zeta engine

换成 ./bin/seatunnel.sh --config config/saas/sync_orig_mqtt_doris_to_hive.conf 命令也是一样的错误 image

liunaijie commented 6 months ago

the variable replacement only available on zeta engine

换成 ./bin/seatunnel.sh --config config/saas/sync_orig_mqtt_doris_to_hive.conf 命令也是一样的错误 image

do you input - i startTs=xxx parameter? i try with zeta engine, it works well and if i miss -i it will got this error.

github-actions[bot] commented 5 months ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] commented 5 months ago

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.