itinycheng / flink-connector-clickhouse

Flink SQL connector for ClickHouse. Support ClickHouseCatalog and read/write primary data, maps, arrays to clickhouse.
Apache License 2.0
374 stars 156 forks source link

Update operation exception #19

Closed niuhuaian closed 2 years ago

niuhuaian commented 2 years ago

When I use sqlserver CDC as the source, it is normal to insert and delete records in the sqlserver table. When the update record is abnormal, I exit the job.

CREATE TABLE ch_user ( id int NOT NULL, name varchar(50), comment varchar(255), create_time timestamp, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'clickhouse', 'url' = 'clickhouse://xxxxx:8123', 'database-name' = 'default', 'table-name' = 'user', 'username' = 'default', 'password' = '123456', 'sink.batch-size' = '500', 'sink.flush-interval' = '1000', 'sink.max-retries' = '3' );

2022-03-03 15:14:48 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: Flush exception found. at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkFlushException(AbstractClickHouseOutputFormat.java:103) at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:77) at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.writeRecord(ClickHouseBatchOutputFormat.java:16) at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87) at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:49) at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411) at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.emitRecordsUnderCheckpointLock(DebeziumChangeFetcher.java:252) at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.handleBatch(DebeziumChangeFetcher.java:237) at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.runFetchLoop(DebeziumChangeFetcher.java:163) at com.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:446) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) Caused by: java.lang.RuntimeException: Flush exception found. at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkFlushException(AbstractClickHouseOutputFormat.java:103) at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:72) at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchOutputFormat.flush(ClickHouseBatchOutputFormat.java:93) at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.lambda$scheduledFlush$0(AbstractClickHouseOutputFormat.java:59) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.lang.RuntimeException: Flush exception found. ... 11 more Caused by: java.io.IOException: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3 at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:76) ... 9 more Caused by: java.sql.SQLException: Attempt to execute batch failed, exhausted retry times = 3 at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseExecutor.attemptExecuteBatch(ClickHouseExecutor.java:67) at org.apache.flink.connector.clickhouse.internal.executor.ClickHouseUpsertExecutor.executeBatch(ClickHouseUpsertExecutor.java:125) at org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat.checkBeforeFlush(AbstractClickHouseOutputFormat.java:74) ... 9 more

itinycheng commented 2 years ago

This is an exception originating from flink-connector-clockhouse, but the real reason can't be found from this exception info, probably an exception thrown from the clickhouse server side. The only thing that ClickHouseBatchOutputFormat.flush() does is call ClickHousePreparedStatement.executeBatch() to send data to server side, you'd better do some breakpoint tests.