apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.82k stars 1.76k forks source link

[Bug] [connectoer v2 sink clickhouse] mysql cdc -> clickhnouse #6014

Closed spihiker closed 8 months ago

spihiker commented 9 months ago

Search before asking

What happened

service crash。

SeaTunnel Version

2.3.3

SeaTunnel Config

env {
      # You can set SeaTunnel environment configuration here
      execution.parallelism = 2
      job.mode = "STREAMING"

      checkpoint.interval = 10000
      #execution.checkpoint.interval = 10000
      #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
    }

    source {
      MySQL-CDC {

        username = "cdc"
        password = "cdc"

        table-names = ["mydb.t1_202310","mydb.t1_202311"]
        base-url = "jdbc:mysql://172.20.*.*:3307/mydb"
      }
    }

    sink {
        Clickhouse {
           host = "172.20.*.*:8123"
           database = "mydb"
           table = "t1"
           username = "dev"
           password = "dev"

           # cdc options
           primary_key = "c1"
           support_upsert = true
           allow_experimental_lightweight_delete = true
         }
       }

Running Command

./bin/seatunnel.sh --config ./config/mycdc-2-ch.conf -e local

Error Exception

java.lang.RuntimeException: org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed] - Clickhouse execute batch statement error
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:232) ~[seatunnel-starter.jar:2.3.3]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:61) ~[seatunnel-starter.jar:2.3.3]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) ~[seatunnel-starter.jar:2.3.3]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) ~[seatunnel-starter.jar:2.3.3]
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:71) ~[seatunnel-starter.jar:2.3.3]
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:51) ~[seatunnel-starter.jar:2.3.3]
        at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:52) ~[seatunnel-starter.jar:2.3.3]
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) ~[seatunnel-starter.jar:2.3.3]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) ~[seatunnel-starter.jar:2.3.3]
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) ~[seatunnel-starter.jar:2.3.3]
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:613) [seatunnel-starter.jar:2.3.3]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_391]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_391]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_391]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_391]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_391]
Caused by: org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed] - Clickhouse execute batch statement error
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.flush(ClickhouseSinkWriter.java:119) ~[connector-clickhouse-2.3.3.jar:2.3.3]
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.flush(ClickhouseSinkWriter.java:134) ~[connector-clickhouse-2.3.3.jar:2.3.3]
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.prepareCommit(ClickhouseSinkWriter.java:93) ~[connector-clickhouse-2.3.3.jar:2.3.3]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:166) ~[seatunnel-starter.jar:2.3.3]
        ... 15 more
        Suppressed: java.sql.SQLException: Cannot set null to non-nullable column #3 [c3 String]
                at com.clickhouse.jdbc.SqlExceptionUtils.clientError(SqlExceptionUtils.java:73) ~[connector-clickhouse-2.3.3.jar:2.3.3]
                at com.clickhouse.jdbc.internal.InputBasedPreparedStatement.addBatch(InputBasedPreparedStatement.java:328) ~[connector-clickhouse-2.3.3.jar:2.3.3]
                at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.FieldNamedPreparedStatement.addBatch(FieldNamedPreparedStatement.java:371) ~[connector-clickhouse-2.3.3.jar:2.3.3]
                at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.InsertOrUpdateBatchStatementExecutor.addToBatch(InsertOrUpdateBatchStatementExecutor.java:90) ~[connector-clickhouse-2.3.3.jar:2.3.3]
                at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.ReduceBufferedBatchStatementExecutor.executeBatch(ReduceBufferedBatchStatementExecutor.java:76) ~[connector-clickhouse-2.3.3.jar:2.3.3]
                at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.ReduceBufferedBatchStatementExecutor.closeStatements(ReduceBufferedBatchStatementExecutor.java:99) ~[connector-clickhouse-2.3.3.jar:2.3.3]
                at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutor.close(JdbcBatchStatementExecutor.java:37) ~[connector-clickhouse-2.3.3.jar:2.3.3]
                at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.flush(ClickhouseSinkWriter.java:137) ~[connector-clickhouse-2.3.3.jar:2.3.3]
                at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.prepareCommit(ClickhouseSinkWriter.java:93) ~[connector-clickhouse-2.3.3.jar:2.3.3]
                at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:166) ~[seatunnel-starter.jar:2.3.3]
                at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:61) ~[seatunnel-starter.jar:2.3.3]
                at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) ~[seatunnel-starter.jar:2.3.3]
                at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) ~[seatunnel-starter.jar:2.3.3]
                at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:71) ~[seatunnel-starter.jar:2.3.3]
                at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:51) ~[seatunnel-starter.jar:2.3.3]
                at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:52) ~[seatunnel-starter.jar:2.3.3]
                at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) ~[seatunnel-starter.jar:2.3.3]
                at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) ~[seatunnel-starter.jar:2.3.3]
                at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) ~[seatunnel-starter.jar:2.3.3]
                at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:613) [seatunnel-starter.jar:2.3.3]
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_391]
                at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_391]
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_391]
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_391]
                at java.lang.Thread.run(Thread.java:750) [?:1.8.0_391]
Caused by: java.sql.SQLException: Cannot set null to non-nullable column #3 [c3 String]
        at com.clickhouse.jdbc.SqlExceptionUtils.clientError(SqlExceptionUtils.java:73) ~[connector-clickhouse-2.3.3.jar:2.3.3]
        at com.clickhouse.jdbc.internal.InputBasedPreparedStatement.addBatch(InputBasedPreparedStatement.java:328) ~[connector-clickhouse-2.3.3.jar:2.3.3]
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.FieldNamedPreparedStatement.addBatch(FieldNamedPreparedStatement.java:371) ~[connector-clickhouse-2.3.3.jar:2.3.3]
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.InsertOrUpdateBatchStatementExecutor.addToBatch(InsertOrUpdateBatchStatementExecutor.java:90) ~[connector-clickhouse-2.3.3.jar:2.3.3]
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.ReduceBufferedBatchStatementExecutor.executeBatch(ReduceBufferedBatchStatementExecutor.java:76) ~[connector-clickhouse-2.3.3.jar:2.3.3]
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.flush(ClickhouseSinkWriter.java:117) ~[connector-clickhouse-2.3.3.jar:2.3.3]
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.flush(ClickhouseSinkWriter.java:134) ~[connector-clickhouse-2.3.3.jar:2.3.3]
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.prepareCommit(ClickhouseSinkWriter.java:93) ~[connector-clickhouse-2.3.3.jar:2.3.3]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:166) ~[seatunnel-starter.jar:2.3.3]
        ... 15 more
2023-12-15 18:11:51,799 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-943069] [5.1] taskDone, taskId = 50000, taskGroup = TaskGroupLocation{jobId=787984932684169217, pipelineId=1, taskGroupId=30000}
2023-12-15 18:11:51,799 WARN  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-943069] [5.1] Exception in org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@6951fe5c
org.apache.seatunnel.common.utils.SeaTunnelException: java.lang.InterruptedException: sleep interrupted
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:166) ~[connector-cdc-mysql-2.3.3.jar:2.3.3]
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:92) ~[connector-cdc-mysql-2.3.3.jar:2.3.3]
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:98) ~[connector-cdc-mysql-2.3.3.jar:2.3.3]

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

spihiker commented 9 months ago

So many bugs。What to do?

Carl-Zhou-CN commented 9 months ago

hi, @spihiker This seems to be caused by the fact that the c3 field itself cannot be null, can you check the ddl for this?

spihiker commented 8 months ago

Thank you very much.