DataLinkDC / dinky

Dinky is a real-time data development platform based on Apache Flink, enabling agile data development, deployment and operation.
http://www.dinky.org.cn
Apache License 2.0
3.13k stars 1.15k forks source link

[Bug] [client] CDCSOURCE 任务 kafka Sink的 properties配置无效 #1083

Closed boolean-dev closed 2 years ago

boolean-dev commented 2 years ago

Search before asking

What happened

背景

我目前在执行 postgres-cdc 整库数据输出到 Kafka,任务的 sql 如下

EXECUTE CDCSOURCE jobname WITH (
'connector' = 'postgres-cdc',
'hostname' = '***.rds.amazonaws.com',
'port' = '5432',
'username' = '***',
'password'='***',
'checkpoint'='1000',
'scan.startup.mode'='initial',
'parallelism'='1',
'database-name'='gateway',
'schema-name'='public',
'table-name'='public\.account,public\.account_social',
'source.server-time-zone' = 'UTC',
'source.decoding.plugin.name' = 'wal2json',
'source.slot.name' = 'slot_cdc_flink',
'sink.connector'='datastream-kafka',
'sink.topic'='dlinkcdc',
'sink.brokers'='kafka-0.**:19092,kafka-1.**:19092,**:19092'
)

任务执行过程中,flink报如下的错误: image 因此我尝试将sql修改为如下参数:

EXECUTE CDCSOURCE jobname WITH (
'connector' = 'postgres-cdc',
'hostname' = '***.rds.amazonaws.com',
'port' = '5432',
'username' = '***',
'password'='***',
'checkpoint'='1000',
'scan.startup.mode'='initial',
'parallelism'='1',
'database-name'='gateway',
'schema-name'='public',
'table-name'='public\.account,public\.account_social',
'source.server-time-zone' = 'UTC',
'source.decoding.plugin.name' = 'wal2json',
'source.slot.name' = 'slot_cdc_flink',
'sink.connector'='datastream-kafka',
'sink.topic'='dlinkcdc',
'sink.brokers'='kafka-0.**:19092,kafka-1.**:19092,kafka-2.**:19092',
'sink.properties.transaction.timeout.ms'='300000'

增加了 'sink.properties.transaction.timeout.ms'='300000' 这个参数,但是错误依旧复现,此配置并未加入 KafkaSink

What you expected to happen

CDCSOURCE 的 KafkaSink 适配 sink.properties.*

How to reproduce

执行 CDCSOURCE 的 datastream-kafka Sink 即可复现

Anything else

bug 定位

KafkaSink 如何添加 transaction.timeout.ms

目前我查看了 Dinky Client 相关的源码,在 com.dlink.cdc.kafka.KafkaSinkBuilder 类中,可以添加如下代码 image

此处,我发现 KafkaSinkBuilder 的父类 AbstractSinkBuilder 已经有了 getProperties() 方法,因此就直接调用了父类的方法,但是日志打印结果显示并未生效。 因此,我再次调用 config.getSink() 获取 sink配置,获取到了对应的配置。 目前因为我处于调试阶段,因此就直接 put ·transaction.timeout.ms 参数了,后续再次调用 .setKafkaProducerConfig(kafkaProducerConfig) 将 KafkaSink 配置写入,我在后续测试的过程中,此方法设置配置有效。

AbstractSinkBuilder.getProperties() 方法bug

以下是此方法的源码

    protected Properties getProperties() {
        Properties properties = new Properties();
        /*
        Sink 目前的数据为
        brokers=kafka-0.**:19092,kafka-1.**:19092,kafka-2.**:19092
        connector=datastream-kafka
        properties.transaction.timeout.ms=300000
         */
        Map<String, String> sink = config.getSink();
        for (Map.Entry<String, String> entry : sink.entrySet()) {
            // 此处获取的 properties 的要求是 sink.properties 开头,但是在写入 sink 时,`sink.`前缀已经去除了,
            // 因此,此参数获取不到外层 `'sink.properties.transaction.timeout.ms'='300000'` 的配置
            // 需要改成如下 `entry.getKey().startsWith("properties")` 
            if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
                properties.setProperty(entry.getKey().replace("sink.properties.", ""), entry.getValue());
            }
        }
        return properties;
    }

sql 的sink配置:

'sink.connector'='datastream-kafka',
'sink.topic'='dlinkcdc',
'sink.brokers'='kafka-0.**:19092,kafka-1.**:19092,kafka-2.**:19092',
'sink.properties.transaction.timeout.ms'='300000'

config.getSink() 中的数据

        brokers=kafka-0.**:19092,kafka-1.**:19092,kafka-2.**:19092
        connector=datastream-kafka
        properties.transaction.timeout.ms=300000

此处获取的 properties 的要求是 sink.properties 开头,但是在写入 sink 时,sink.前缀已经去除了,因此,此参数获取不到外层 'sink.properties.transaction.timeout.ms'='300000' 的配置。需要改成如下 entry.getKey().startsWith("properties")

后续

我后续会将对应的修改 PR 上去,但是有一点我不敢确认。目前AbstractSinkBuilder.getProperties() 方法有一个地方已经被调用过。 com/dlink/cdc/doris/DorisSinkBuilder.java:151 此代码调用了,我不清楚我修改是否会影响到 DorisSinkBuilder 的业务

Version

dev

Are you willing to submit PR?

Code of Conduct

aiwenmo commented 2 years ago

非常感谢您指出的问题,这个问题在之前优化 DorisSinkBuilder 的时候进行了改进,存在不兼容的情况,以下有修复两种方式:

aiwenmo commented 2 years ago

1.抽象类用 .startsWith("properties"),doris 重写这个 getProperties() ,doris的配置是 sink.sink.properties; 2.kafka 重写这个 getProperties(),kafka 的配置是 sink.properties。 两种方式都可,具体取决于 sink.sink.properties 和 sink.properties 哪种更普及。

boolean-dev commented 2 years ago

好的,我后续有时间,修复好这个bug,然后PR

tyzheng114 commented 1 year ago

大佬好,我有个想法,就是可以考虑重载AbstractSinkBuilder.getProperties(), 比如getProperties(String propPrefix),propPrefix根据不同的sink选择properties或者sink.properties, 这样的话,后续扩展其它sink的properties的时候会不会方便点

boolean-dev commented 1 year ago

其实关键还是配置一致性的问题,加入我要配置 kafka 的 properties 那我填写 sink.properties. 就好了 假如我要配置 hive 的配置信息,那就填写 sink.properties. 就好了,这样的话来说,对用户是最友好的

假如后续有人需要扩展新的 Sink,那它获取单独的配置信息也从 sink.properties.* 里面拿取就好了 我的感觉是把方便的配置交给用户,把一些后端的转换交给各个 Sink