DTStack / chunjun

A data integration framework
https://dtstack.github.io/chunjun/
Apache License 2.0
4k stars 1.69k forks source link

[Bug] [chunjun-connector-jdbc-base] clickhouse 不支持事务 #1564

Closed chenxu8989 closed 1 year ago

chenxu8989 commented 1 year ago

Search before asking

What happened

mysql 同步 clickhouse

job 配置

-- DROP TABLE IF EXISTS t_device_info;
CREATE TABLE IF NOT EXISTS t_device_info_source (
    ......
    ,PRIMARY KEY ( `id` ) NOT ENFORCED
) COMMENT '...'
WITH (
    'connector' = 'mysql-x',
    'url' = '',
    'schema' = 'vrc',
    'table-name' = 't_device_info',
    'username' = 'root',
    'password' = 'xxx'

    ,'scan.polling-interval' = '3000' --间隔轮训时间。非必填(不填为离线任务),无默认

    ,'scan.parallelism' = '1' -- 并行度
    ,'scan.fetch-size' = '100' -- 每次从数据库中fetch大小。默认:1024条
    ,'scan.query-timeout' = '10' -- 数据库连接超时时间。默认:不超时

    -- ,'scan.partition.column' = 'id' -- 多并行度读取的切分字段,必须是表中字段。无默认
    -- ,'scan.partition.strategy' = 'range' -- 数据分片策略。默认:range,如果并行度大于1,且是增量任务或者间隔轮询,则会使用mod分片

    ,'scan.increment.column' = 'update_time' -- 增量字段名称,必须是表中的字段。非必填,无默认
    ,'scan.increment.column-type' = 'TIMESTAMP'  -- 增量字段类型。非必填,无默认
    -- ,'scan.start-location' = '10' -- 增量字段开始位置,如果不指定则先同步所有,然后在增量。非必填,无默认。如果没配置scan.increment.column,则不生效

    ,'scan.restore.columnname' = 'update_time' -- 开启了cp,任务从sp/cp续跑字段名称。如果续跑,则会覆盖scan.start-location开始位置,从续跑点开始。非必填,无默认
    ,'scan.restore.columntype' = 'TIMESTAMP' -- 开启了cp,任务从sp/cp续跑字段类型。非必填,无默认
);

-- DROP TABLE IF EXISTS t_device_info;
CREATE TABLE IF NOT EXISTS t_device_info_sink (
    ...
    ,PRIMARY KEY ( `id` ) NOT ENFORCED
) WITH (
  'connector' = 'clickhouse-x',
  'url' = 'jdbc:clickhouse://.../default',
  'schema' = 'default',
  'table-name' = 't_device_info',
  'username' = 'default',
  'password' = '...',
  'sink.buffer-flush.max-rows' = '10000',
  'sink.all-replace' = 'true'
);

INSERT INTO t_device_info_sink
select * from t_device_info_source

异常信息

2023-03-23 17:17:50
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: open() failed.
    at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.openInternal(JdbcOutputFormat.java:104)
    at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.open(BaseRichOutputFormat.java:262)
    at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.open(DtOutputFormatSinkFunction.java:95)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:63)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:433)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.sql.SQLFeatureNotSupportedException: Transactions are not supported
    at ru.yandex.clickhouse.ClickHouseConnectionImpl.setAutoCommit(ClickHouseConnectionImpl.java:219)
    at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.openInternal(JdbcOutputFormat.java:89)
    ... 13 more

What you expected to happen

mysql 同步 clickhouse

How to reproduce

mysql 同步 clickhouse

Anything else

No response

Version

1.12_release

Are you willing to submit PR?

Code of Conduct

chestnutqiang commented 1 year ago

/Hi, 我看了一下异常调用栈。然后在本地复现了一下问题。

1. Clickhouse 使用 BalancedClickhouseDataSource 这个数据源获取连接的话,是不能设置事务的非自动提交

里面会抛出异常

public void setAutoCommit(boolean autoCommit) throws SQLException {
    if (autoCommit) {
        return;
    }
    throw new SQLFeatureNotSupportedException("Transactions are not supported");
}

2. Clickhouse 使用 ClickHouseDataSource 这个数据源获取连接的话,是可以设置事务的非自动提交的。

conn = new ClickHouseDataSource(url, properties).getConnection();
public void setAutoCommit(boolean autoCommit) throws SQLException {
    ensureOpen();

    if (this.autoCommit == autoCommit) {
        return;
    }

    ensureTransactionSupport();
    if (this.autoCommit = autoCommit) { // commit
        FakeTransaction tx = fakeTransaction.getAndSet(null);
        if (tx != null) {
            tx.logTransactionDetails(log, FakeTransaction.ACTION_COMMITTED);
            tx.clear();
        }
    } else { // start new transaction
        if (!fakeTransaction.compareAndSet(null, new FakeTransaction())) {
            log.warn("[JDBC Compliant Mode] not able to start a new transaction, reuse the exist one");
        }
    }
}

但是 ClickHouse 是使用 JDBC 基类的逻辑,所以最简单的方法修复就在 ClickhouseOutputFormat 获取连接之前 jdbcConfig.setAutoCommit(true); 代码如下

@Override
protected Connection getConnection() throws SQLException {
    jdbcConfig.setAutoCommit(true);
    return ClickhouseUtil.getConnection(
            jdbcConfig.getJdbcUrl(), jdbcConfig.getUsername(), jdbcConfig.getPassword());
}

3. 另外,ClickHouse 单条插入的性能并不是很好。

一种可行的思路是在内存做聚合,超过阈值后溢写出文件,这个地方可以用 nio 去做,会有更好的效率。

4. ClickHouseDataSource 和 BalancedClickhouseDataSource 区别是什么?

ClickHouseDataSource 和 BalancedClickhouseDataSource 是两种用于连接 ClickHouse 数据库的不同数据源。这两者之间的主要区别在于负载均衡和故障转移的支持。

ClickHouseDataSource: 这是一个基本的数据源,它提供了简单的 ClickHouse 连接。当您使用 ClickHouseDataSource 时,您需要指定一个具体的 ClickHouse 服务器实例。如果您有多个实例,那么您需要手动处理负载均衡和故障转移。在单实例部署中,这是非常方便的。

BalancedClickhouseDataSource: 这是一个高级数据源,它支持负载均衡和故障转移。使用 BalancedClickhouseDataSource,您可以指定一组 ClickHouse 服务器实例,它会自动处理负载均衡和故障转移。这意味着,当一个实例出现问题时,BalancedClickhouseDataSource 将自动将请求重定向到其他可用的实例。这对于高可用性和大规模部署非常有用。

总之,如果您有一个简单的单实例部署或者愿意自己处理负载均衡和故障转移,可以使用 ClickHouseDataSource。然而,如果您希望自动负载均衡和故障转移,那么 BalancedClickhouseDataSource 是更好的选择。

5. ClickHouse 的 JDBC Compliant Mode ?

ClickHouse JDBC 驱动程序提供了一个 JDBC Compliant Mode,它允许 ClickHouse 更好地与 JDBC 规范兼容。这种模式的目的是提高 ClickHouse 与使用 JDBC 接口的其他数据库系统的兼容性。

在 JDBC Compliant Mode 下,ClickHouse JDBC 驱动程序会尽可能地遵循 JDBC 规范。然而,由于 ClickHouse 与传统的关系型数据库系统有很大不同,某些功能可能无法完全符合 JDBC 规范。例如,事务支持、存储过程等功能在 ClickHouse 中可能无法实现。

启用 JDBC Compliant Mode 通常可以通过设置 clickHouseClient.jdbcCompliant 属性来实现。在创建 ClickHouseProperties 对象时,您可以设置这个属性,然后将属性传递给 ClickHouseDataSource 或 BalancedClickhouseDataSource。

chestnutqiang commented 1 year ago

1.12 的话 先配置 semantic 为 at-least-once

pengzichen commented 1 year ago

1.12 的话 先配置 semantic 为 at-least-once

这个是具体怎么设置呀,我也遇到相同的问题

sid659 commented 1 year ago

1.12 的话 先配置 semantic 为 at-least-once 加了也一样报错

fightwithyou commented 1 year ago

image 我的是把这里改成true就行,其它不用动。

fightwithyou commented 1 year ago

2.Clickhouse 使用 ClickHouseDataSource 这个数据源获取连接的话,是可以设置事务的非自动提交的 setAutoCommit里调用的ensureOpen方法应该是自定义方法吧,以及后面的ensureTransactionSupport(),类FakeTransaction,应该都是大佬您自定义的吧,大佬可以把这些自定义的内容发出来吗? @chenxu8989