ClickHouse / clickhouse-java

ClickHouse Java Clients & JDBC Driver
https://clickhouse.com
Apache License 2.0
1.45k stars 536 forks source link

Flink reads 1W pieces of data from Kafka each time, and writes the data to MySQL (1W in batch) and Clickhouse (5000 in batch). Occasionally this exception InterruptedException will be reported. #1377

Open DemoMoon opened 1 year ago

DemoMoon commented 1 year ago

Describe the bug

java.sql.SQLException: java.lang.InterruptedException

Steps to reproduce

Flink reads 1W pieces of data from Kafka each time, and writes the data to MySQL (1W in batch) and Clickhouse (5000 in batch). Occasionally this exception InterruptedException will be reported.

Expected behaviour

Code example

public abstract class BaseClickhouse extends RichSinkFunction<List> { private static final Map<String, DataSource> map = new HashMap<>();

void batchInsert(List list, int batchSize) throws Exception { BatchInsertMeta meta = BatchInsertHelper.getBatchInsertMeta(list.get(0).getClass(), "clickhouse"); int start = 0; int end; while (start < list.size()) { end = Math.min(start + batchSize, list.size()); List subList = list.subList(start, end); String sql = ""; try { sql = meta.getPlaceHolderSql(1, "clickhouse"); batchInsertClick(getDataSource(), sql, subList); } catch (Exception e) { log.error("batchInsert is error list size {} batchSize {} ex {}", list.size(), end, e.getMessage(), e); throw new Exception(e.getMessage(), e); } start = end; } } abstract void batchInsertClick(DataSource dataSource, String sql, List subList) throws SQLException; @Override public void open(Configuration parameters) { log.warn("open...{}", map.size()); try { if (map.size() == 0) { ClickhouseHouseConfig ck = SpringContextHolder.getBean(ClickhouseHouseConfig.class); DruidDataSource dataSource = new DruidDataSource(); dataSource.setDbType(DbType.clickhouse); dataSource.setInitialSize(ck.getInitialSize()); dataSource.setMinIdle(ck.getMinIdle()); dataSource.setMaxActive(ck.getMaxActive()); dataSource.setMaxWait(ck.getMaxWait()); dataSource.setUrl(ck.getUrl()); dataSource.setDriverClassName(ck.getDriverName()); dataSource.setUsername(ck.getUser()); dataSource.setPassword(ck.getPwd()); //指明连接是否被空闲连接回收器(如果有)进行检验.如果检测失败,则连接将被从池中去除. dataSource.setTestWhileIdle(true); //验证连接是否可用,使用的SQL语句 dataSource.setValidationQuery("SELECT 1"); //每30秒运行一次空闲连接回收器 dataSource.setTimeBetweenEvictionRunsMillis(30000); map.put("ch_db", dataSource); } } catch (Exception e) { log.error("clickhouse open is error {}", e.getMessage(), e); } } DataSource getDataSource() { return map.get("ch_db"); } ### Error log java.sql.SQLException: interrupt at com.alibaba.druid.pool.DruidDataSource.getConnectionInternal(DruidDataSource.java:1726) at com.alibaba.druid.pool.DruidDataSource.getConnectionDirect(DruidDataSource.java:1494) at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:1474) at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:1459) at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:83) at com.xxx.kafka.sink.clickhouse.ClickHouseCommitSink.batchInsertClick(ClickHouseCommitSink.java:124) at com.xx.kafka.sink.clickhouse.BaseClickhouse.batchInsert(BaseClickhouse.java:44) at com.xxx.kafka.sink.clickhouse.ClickHouseCommitSink.invoke(ClickHouseCommitSink.java:52) at com.xxx.kafka.sink.clickhouse.ClickHouseCommitSink.invoke(ClickHouseCommitSink.java:29) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77) at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.xxx.core.kafka.flink.AhbdTcpUdpLogFlink$3.apply(AhbdTcpUdpLogFlink.java:130) at com.xxx.kafka.flink.AhbdTcpUdpLogFlink$3.apply(AhbdTcpUdpLogFlink.java:125) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:32) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:425) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.InterruptedException: null at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220) at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) at com.alibaba.druid.pool.DruidDataSource.getConnectionInternal(DruidDataSource.java:1723) ... 36 common frames omitted java.sql.SQLException: java.lang.InterruptedException at com.clickhouse.jdbc.SqlExceptionUtils.create(SqlExceptionUtils.java:45) at com.clickhouse.jdbc.SqlExceptionUtils.handle(SqlExceptionUtils.java:90) at com.clickhouse.jdbc.internal.InputBasedPreparedStatement.addBatch(InputBasedPreparedStatement.java:348) at com.alibaba.druid.pool.DruidPooledPreparedStatement.addBatch(DruidPooledPreparedStatement.java:537) at com.xxx.kafka.sink.clickhouse.ClickHouseCommitSink.batchInsertClick(ClickHouseCommitSink.java:206) at com.xx.kafka.sink.clickhouse.BaseClickhouse.batchInsert(BaseClickhouse.java:44) at com.xx.kafka.sink.clickhouse.ClickHouseCommitSink.invoke(ClickHouseCommitSink.java:52) at com.xxx.kafka.sink.clickhouse.ClickHouseCommitSink.invoke(ClickHouseCommitSink.java:29) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77) at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.xxx.xx.xx.core.kafka.flinkxxFlink$3.apply(AhbdTcpUdpLogFlink.java:130) at com.xx.xx.xx.core.kafka.flink.xxFlink$3.apply(AhbdTcpUdpLogFlink.java:125) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:32) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:425) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.InterruptedException: null at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220) at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:380) at com.clickhouse.data.stream.BlockingPipedOutputStream.updateBuffer(BlockingPipedOutputStream.java:88) at com.clickhouse.data.stream.BlockingPipedOutputStream.updateBuffer(BlockingPipedOutputStream.java:78) at com.clickhouse.data.stream.BlockingPipedOutputStream.writeByte(BlockingPipedOutputStream.java:168) at com.clickhouse.data.ClickHouseOutputStream.writeBoolean(ClickHouseOutputStream.java:289) at com.clickhouse.data.format.BinaryDataProcessor$NullableSerializer.serialize(BinaryDataProcessor.java:134) at com.clickhouse.data.ClickHouseDataProcessor.write(ClickHouseDataProcessor.java:593) at com.clickhouse.jdbc.internal.InputBasedPreparedStatement.addBatch(InputBasedPreparedStatement.java:345) ... 33 common frames omitted ### Configuration #### Environment ```springboot 2.5.14+flink(1.14.4)+clickhouse-jdbc(0.4.6)+druid(1.2.8) * Language version: * 4.19.90-23.21.v2101.ky10.x86_64 #### ClickHouse server * ClickHouse Server version:23.4.2.11 * ClickHouse Server non-default settings, if any: * `CREATE TABLE` statements for tables involved: CREATE TABLE default.xx_log ( `id` Int64, `create_time_` Nullable(DateTime64(3)), `update_time_` Nullable(DateTime64(3)), `d_city` Nullable(String), `d_country` Nullable(String), `d_district` Nullable(String), `d_ip` Array(String), `d_ipv6` Nullable(String), `d_lati` Nullable(Float64), `d_long` Nullable(Float64), `d_mac` Nullable(String), `d_natip` Nullable(String), `d_natp` Nullable(String), `d_port` Array(Int64), `d_province` Nullable(String), `desc_` Nullable(String), `duration` Nullable(Int64), `dvc_a` Nullable(String), `e_id` Nullable(String), `evt_t` Nullable(String), `first` Nullable(String), `first_p` Nullable(String), `fle_n` Nullable(String), `fle_p` Nullable(String), `fle_s` Nullable(Int64), `head_hg` Nullable(String), `history` Nullable(String), `i_bytes` Nullable(Int64), `i_local` Nullable(String), `i_pkts` Nullable(Int64), `interface` Nullable(String), `last` Nullable(String), `last_p` Nullable(String), `missed_bytes` Nullable(String), `name` Nullable(String), `o_bytes` Nullable(Int64), `o_local` Nullable(String), `o_pkts` Nullable(Int64), `odf_tolen` Nullable(Int64), `org_location` Nullable(String), `org_name` Nullable(String), `payload_hg` Nullable(String), `pkg_t` Nullable(Int64), `pkts_dir` Nullable(String), `pkts_size` Nullable(Int64), `pkts_ts` Nullable(String), `pro_a` Nullable(String), `pro_t` Nullable(String), `record` Nullable(String), `s_city` Nullable(String), `s_country` Nullable(String), `s_district` Nullable(String), `s_ip` Array(String), `s_ipv6` Nullable(String), `s_lati` Nullable(Float64), `s_long` Nullable(Float64), `s_mac` Nullable(String), `s_natip` Nullable(String), `s_natp` Nullable(String), `s_port` Array(Int64), `s_province` Nullable(String), `session_id` Nullable(String), `severity` Nullable(String), `state` Nullable(Int64), `t_flags` Nullable(String), `tags` Nullable(String), `tcp_end` Nullable(String), `tcpflag_hg` Nullable(String), `tcpkt_len` Nullable(Int64), `tcpkt_time` Nullable(String), `tcpl_bd` Nullable(String), `time` Nullable(String), `tunnel_id` Nullable(String), `vendor_id` Nullable(String), `vl_id` Nullable(Int64), `vl_inner_id` Nullable(String), `vxlan_i_mac` Nullable(String), `vxlan_o_mac` Nullable(String), `vxlan_vni` Nullable(String) ) ENGINE = Distributed('xx_3s2r_cluster', '', 'xx', rand());
DemoMoon commented 1 year ago

ClickHouseCommitSink.java:124 try (Connection con = dataSource.getConnection(); PreparedStatement ps = con.prepareStatement(sql)) {

ClickHouseCommitSink.java:206 ps.addBatch();

DemoMoon commented 1 year ago

c.u.a.b.c.k.s.clickhouse.BaseClickhouse : batchInsert is error list size 10000 batchSize 10000 ex Unknown error 1002, server ClickHouseNode [uri=http://xxx:8123/default]@855626085

java.sql.BatchUpdateException: Unknown error 1002, server ClickHouseNode [uri=http://xx:8123/default]@855626085 at com.clickhouse.jdbc.SqlExceptionUtils.batchUpdateError(SqlExceptionUtils.java:107) at com.clickhouse.jdbc.internal.InputBasedPreparedStatement.executeAny(InputBasedPreparedStatement.java:154)

After I tried to put the loading data source in the process of springboot startup, the above exception information disappeared, and now this exception appears again. May I ask what caused it?

DemoMoon commented 1 year ago

c.u.a.b.c.k.s.clickhouse.BaseClickhouse : batchInsert is error list size 10000 batchSize 10000 ex Unknown error 1002, server ClickHouseNode [uri=http://xxx:8123/default]@855626085

java.sql.BatchUpdateException: Unknown error 1002, server ClickHouseNode [uri=http://xx:8123/default]@855626085 at com.clickhouse.jdbc.SqlExceptionUtils.batchUpdateError(SqlExceptionUtils.java:107) at com.clickhouse.jdbc.internal.InputBasedPreparedStatement.executeAny(InputBasedPreparedStatement.java:154)

After I tried to put the loading data source in the process of springboot startup, the above exception information disappeared, and now this exception appears again. May I ask what caused it?

Occasionally this exception will be reported

zhicwu commented 1 year ago

Hi @DemoMoon, how long it took for the insertion? InterruptedException was raised when insertion was interrupted by another thread, but it's hard to tell from the stack trace. Unknown error 1002 shows the JDBC driver was not able to extract error from server response, so you'll have to look into system.query_log table or error log in ClickHouse to investigate.