[X] I searched in the issues and found nothing similar.
Version
linux
lotdb: 1.3.2
Describe the bug and provide the minimal reproduce step
1、使用flink cdc sql
CREATE TABLE iotable_table3 (
`Timebigint, root.zhang.table3.idint, root.zhang.table3.namestring, root.zhang.table3.addressSTRING ) WITH ( 'connector' = 'IoTDB', 'nodeUrls'='ddp1:6667', 'mode'='cdc', 'cdc.task.name'='test1', 'cdc.pattern'='root.zhang.table3' ) 2、直接select * from iotable_table_3,此时没有数据。查看日志后发现,WebSocketConnectorServer启动失败,端口默认8080,被占用 3、切换端口: CREATE TABLE iotable_table_2 ( Time_bigint, root.zhang.table3.idint, root.zhang.table3.namestring, root.zhang.table3.address` STRING
) WITH (
'connector' = 'IoTDB',
'nodeUrls'='ddp1:6667',
'mode'='cdc',
'cdc.task.name'='test2',
'cdc.pattern'='root.zhang.table3',
'cdc.port'='18089'
)
但是发现flink 任务日志报错
org.apache.iotdb.rpc.StatementExecutionException: 1107: The websocket server has already been created with port = 8080. Please set the option cdc.port = 8080.
at org.apache.iotdb.rpc.RpcUtils.verifySuccess(RpcUtils.java:94)
at org.apache.iotdb.session.SessionConnection.executeNonQueryStatement(SessionConnection.java:474)
at org.apache.iotdb.session.Session.executeNonQueryStatement(Session.java:914)
at org.apache.iotdb.flink.sql.function.IoTDBCDCSourceFunction.open(IoTDBCDCSourceFunction.java:134)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:101)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:750)
What did you expect to see?
能够切换端口,并且通过flink-sql查出来iotdb cdc的数据
What did you see instead?
这个方法有问题。
它会被这样调用:
new WebSocketConnectorServer成功后,直接启动了线程。但是线程内部运行报错,这个实例还在private static final AtomicReference instance,因此每次获取都是以前的旧实例。我切换端口也不会管用
Search before asking
Version
linux lotdb: 1.3.2
Describe the bug and provide the minimal reproduce step
1、使用flink cdc sql CREATE TABLE iotable_table3 ( `Time
bigint,
root.zhang.table3.idint,
root.zhang.table3.namestring,
root.zhang.table3.addressSTRING ) WITH ( 'connector' = 'IoTDB', 'nodeUrls'='ddp1:6667', 'mode'='cdc', 'cdc.task.name'='test1', 'cdc.pattern'='root.zhang.table3' ) 2、直接select * from iotable_table_3,此时没有数据。查看日志后发现,WebSocketConnectorServer启动失败,端口默认8080,被占用 3、切换端口: CREATE TABLE iotable_table_2 (
Time_bigint,
root.zhang.table3.idint,
root.zhang.table3.namestring,
root.zhang.table3.address` STRING ) WITH ( 'connector' = 'IoTDB', 'nodeUrls'='ddp1:6667', 'mode'='cdc', 'cdc.task.name'='test2', 'cdc.pattern'='root.zhang.table3', 'cdc.port'='18089' ) 但是发现flink 任务日志报错 org.apache.iotdb.rpc.StatementExecutionException: 1107: The websocket server has already been created with port = 8080. Please set the option cdc.port = 8080. at org.apache.iotdb.rpc.RpcUtils.verifySuccess(RpcUtils.java:94) at org.apache.iotdb.session.SessionConnection.executeNonQueryStatement(SessionConnection.java:474) at org.apache.iotdb.session.Session.executeNonQueryStatement(Session.java:914) at org.apache.iotdb.flink.sql.function.IoTDBCDCSourceFunction.open(IoTDBCDCSourceFunction.java:134) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:101) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:750)What did you expect to see?
能够切换端口,并且通过flink-sql查出来iotdb cdc的数据
What did you see instead?
这个方法有问题。
它会被这样调用:
new WebSocketConnectorServer成功后,直接启动了线程。但是线程内部运行报错,这个实例还在private static final AtomicReference instance,因此每次获取都是以前的旧实例。我切换端口也不会管用
Anything else?
No response
Are you willing to submit a PR?