apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.02k stars 1.82k forks source link

[Bug] [Connector-V2] Bug clickhouse sink does not support composite primary key in CDC mode #7948

Open ensean opened 1 week ago

ensean commented 1 week ago

Search before asking

What happened

Sync data from MySQL to clickhouse in CDC mode can not set composite primary key.

2024-10-31 02:30:04,365 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Fatal Error,

2024-10-31 02:30:04,366 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues

2024-10-31 02:30:04,366 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Reason:SeaTunnel job executed failed

2024-10-31 02:30:04,367 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:213)
    at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
    at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: CheckpointCoordinator inside have error.
    at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:281)
    at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:277)
    at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:390)
    at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:184)
    at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.runInternal(CheckpointErrorReportOperation.java:48)
    at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:44)
    at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
    at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 4
    at org.apache.seatunnel.api.table.type.SeaTunnelRowType.getFieldType(SeaTunnelRowType.java:73)
    at java.base/java.util.stream.IntPipeline$1$1.accept(IntPipeline.java:180)
    at java.base/java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
    at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)
    at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
    at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutorBuilder.getKeyTypes(JdbcBatchStatementExecutorBuilder.java:248)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutorBuilder.build(JdbcBatchStatementExecutorBuilder.java:82)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.lambda$initStatementMap$2(ClickhouseSinkWriter.java:188)
    at java.base/java.util.TreeMap.forEach(TreeMap.java:1002)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.initStatementMap(ClickhouseSinkWriter.java:159)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.<init>(ClickhouseSinkWriter.java:64)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSink.createWriter(ClickhouseSink.java:229)
    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:293)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$16(SeaTunnelTask.java:426)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:423)
    at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$runInternal$0(NotifyTaskRestoreOperation.java:107)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
    at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

    ... 12 more

    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:205)
    ... 2 more

2024-10-31 02:30:04,370 ERROR [o.a.s.c.s.SeaTunnel           ] [main] -

SeaTunnel Version

2.3.8

SeaTunnel Config

env {
  parallelism = 3
  job.mode = "STREAMING"
  checkpoint.interval = 10000
}

source {
  MySQL-CDC {
    base-url = "jdbc:mysql://database-1.xxxxxxx.ap-northeast-1.rds.amazonaws.com:3306/employees"
    username = "admin"
    password = "xxxxxx"
    table-names = ["employees.salaries"]
    startup.mode = "initial"
  }
}

sink {
  Clickhouse {
    host = "172.31.42.x:8123"
    database = "default"
    table = "salaries4"
    username = "default"
    password = ""
    primary_key = "emp_no, from_date"
    support_upsert = true
  }
}

Running Command

./bin/seatunnel.sh --config ./config/mysqlcdc2ck.template -m local

Error Exception

2024-10-31 02:30:04,365 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Fatal Error,

2024-10-31 02:30:04,366 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues

2024-10-31 02:30:04,366 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Reason:SeaTunnel job executed failed

2024-10-31 02:30:04,367 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:213)
    at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
    at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: CheckpointCoordinator inside have error.
    at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:281)
    at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:277)
    at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:390)
    at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:184)
    at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.runInternal(CheckpointErrorReportOperation.java:48)
    at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:44)
    at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
    at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 4
    at org.apache.seatunnel.api.table.type.SeaTunnelRowType.getFieldType(SeaTunnelRowType.java:73)
    at java.base/java.util.stream.IntPipeline$1$1.accept(IntPipeline.java:180)
    at java.base/java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
    at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)
    at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
    at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutorBuilder.getKeyTypes(JdbcBatchStatementExecutorBuilder.java:248)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutorBuilder.build(JdbcBatchStatementExecutorBuilder.java:82)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.lambda$initStatementMap$2(ClickhouseSinkWriter.java:188)
    at java.base/java.util.TreeMap.forEach(TreeMap.java:1002)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.initStatementMap(ClickhouseSinkWriter.java:159)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.<init>(ClickhouseSinkWriter.java:64)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSink.createWriter(ClickhouseSink.java:229)
    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:293)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$16(SeaTunnelTask.java:426)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:423)
    at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$runInternal$0(NotifyTaskRestoreOperation.java:107)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
    at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

    ... 12 more

    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:205)
    ... 2 more

2024-10-31 02:30:04,370 ERROR [o.a.s.c.s.SeaTunnel           ] [main] -


### Zeta or Flink or Spark Version

zeta

### Java or Scala Version

Java 11

### Screenshots

_No response_

### Are you willing to submit PR?

- [ ] Yes I am willing to submit a PR!

### Code of Conduct

- [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
ensean commented 1 week ago

MySQL table DDL

CREATE TABLE salaries (
    emp_no      INT             NOT NULL,
    salary      INT             NOT NULL,
    from_date   DATE            NOT NULL,
    to_date     DATE            NOT NULL,
    FOREIGN KEY (emp_no) REFERENCES employees (emp_no) ON DELETE CASCADE,
    PRIMARY KEY (emp_no, from_date)
)

Clickhouse table DDL

CREATE TABLE default.salaries4
(
    `emp_no` UInt64,
    `salary` UInt64,
    `from_date` String,
    `to_date` String
)
ENGINE = MergeTree
PRIMARY KEY (emp_no, from_date)
ORDER BY (emp_no, from_date)
eyys commented 1 week ago

Can I claim this bug?

davidzollo commented 1 week ago

Can I claim this bug?

good job, has assigned to you