DTStack / chunjun

A data integration framework
https://dtstack.github.io/chunjun/
Apache License 2.0
3.98k stars 1.69k forks source link

[Bug] [clickhouse-x] sink-启动报错,无法正常工作:No value present #1838

Open waryars opened 11 months ago

waryars commented 11 months ago

Search before asking

What happened

1 (04adf9b58997215c316ac4ca79d96627) switched from RUNNING to FAILED.

java.util.NoSuchElementException: No value present at java.util.Optional.get(Optional.java:135) at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.prepareTemplates(JdbcOutputFormat.java:308) at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.buildStmtProxy(JdbcOutputFormat.java:124) at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.openInternal(JdbcOutputFormat.java:107) at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.open(BaseRichOutputFormat.java:262) at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.open(DtOutputFormatSinkFunction.java:95) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:63) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:433) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) at java.lang.Thread.run(Thread.java:748)

What you expected to happen

正常工作

How to reproduce

CREATE TABLE source ( id int, cname varchar, sale_amt decimal(18,2), ttime timestamp ) WITH ( 'connector' = 'binlog-x' ,'username' = 'root' ,'password' = 'xxxx' ,'cat' = 'insert,delete,update' ,'url' = 'jdbc:mysql://172.16.44.83:3306/ambari?useSSL=false' ,'host' = '172.16.44.83' ,'port' = '3306' ,'table' = 'ambari.test_binlog' ,'timestamp-format.standard' = 'SQL' );

CREATE TABLE sink ( id int, cname varchar, sale_amt decimal(18,2), ttime timestamp, primary key (id) not enforced ) WITH ( 'connector' = 'clickhouse-x', 'url' = 'jdbc:clickhouse://172.16.44.85:8123/ck_db1', 'table-name' = 'test_binlog', 'username' = 'bigdata', 'password' = 'bigdata', 'sink.buffer-flush.max-rows' = '1', 'sink.all-replace' = 'true' );

insert into sink select * from source u;

Anything else

No response

Version

master

Are you willing to submit PR?

Code of Conduct