itinycheng / flink-connector-clickhouse

Flink SQL connector for ClickHouse. Support ClickHouseCatalog and read/write primary data, maps, arrays to clickhouse.
Apache License 2.0
363 stars 154 forks source link

mysql--> flink-1.13.5 bin/sql-client.sh ->clickhouse #15

Closed ADingDing closed 2 years ago

ADingDing commented 2 years ago

mysql: connector' = 'mysql-cdc' clickhouse: your package

CREATE TABLE orders ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.20.187', 'port' = '3309', 'username' = 'root', 'password' = '123456', 'database-name' = 'mydb', 'table-name' = 'orders' );

2. CREATE TABLE clickhouseorders ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'clickhouse', 'url' = 'clickhouse://192.168.20.187:8123', 'database-name' = 'test', 'table-name' = 'clickhouseorders', 'sink.batch-size' = '500', 'sink.flush-interval' = '1000', 'sink.max-retries' = '3' ); 3. INSERT INTO clickhouseorders SELECT o.order_id,o.order_date, o.customer_name, o.price FROM orders AS o;

result: 2022-02-14 14:42:44 java.lang.NoClassDefFoundError: org/apache/flink/util/concurrent/ExecutorThreadFactory at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.scheduledFlush(AbstractClickHouseOutputFormat.java:52) at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.open(ClickHouseBatchOutputFormat.java:69) at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748)

what should I do next? thanks

ADingDing commented 2 years ago

版本导致 关闭