Closed trtuancs closed 1 year ago
Info env: python version: 3.8.13 apache-flink version: 1.16.0 clickhouse version: 21.2.5.5
My Code:
from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcExecutionOptions, JdbcConnectionOptions import json import logging import sys from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment JDBC_JAR_PATH = "file:///Users/admin/Documents/jdbc-0.3.2-patch11-all.jar" env = StreamExecutionEnvironment.get_execution_environment() env.add_jars(JDBC_JAR_PATH) type_info = Types.ROW([Types.INT(), Types.STRING(), Types.STRING(), Types.INT()]) env.from_collection( [(101, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019), (102, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018), (103, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017), (104, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017) ], type_info=type_info) \ .add_sink( JdbcSink.sink( "insert into books (id, title, authors, year) values (?, ?, ?, ?)", type_info, JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .with_url('jdbc:clickhouse://localhost:8123/public') .with_driver_name('com.clickhouse.jdbc.ClickHouseDriver') .with_user_name('user') .with_password('password') .build(), JdbcExecutionOptions.builder() .with_batch_interval_ms(1000) .with_batch_size(200) .with_max_retries(5) .build() )) env.execute() ```python ### Error Caused by: java.lang.NoSuchMethodError: com.clickhouse.client.ClickHouseConfig.getMaxResultRows()I at com.clickhouse.jdbc.internal.ClickHouseStatementImpl.<init>(ClickHouseStatementImpl.java:243) at com.clickhouse.jdbc.internal.SqlBasedPreparedStatement.<init>(SqlBasedPreparedStatement.java:62) at com.clickhouse.jdbc.internal.ClickHouseConnectionImpl.prepareStatement(ClickHouseConnectionImpl.java:594) at com.clickhouse.jdbc.ClickHouseConnection.prepareStatement(ClickHouseConnection.java:102) at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.prepareStatements(SimpleBatchStatementExecutor.java:58) at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.createAndOpenStatementExecutor(JdbcOutputFormat.java:172) at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.open(JdbcOutputFormat.java:144) at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:52) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:750)
Have you solved this problem, if so, I need your help!
Info env: python version: 3.8.13 apache-flink version: 1.16.0 clickhouse version: 21.2.5.5
My Code: