itinycheng / flink-connector-clickhouse

Flink SQL connector for ClickHouse. Support ClickHouseCatalog and read/write primary data, maps, arrays to clickhouse.
Apache License 2.0
347 stars 149 forks source link

flink插入集群数据异常问题 #73

Closed jieshier closed 8 months ago

jieshier commented 1 year ago

hi,我想请教几个问题 1)、flink同步到ck集群是插本地表还是分布式表合适,我这里是选择的插入本地表,配置的hash算法分发数据,但是这导致了flink全量同步mysql表数据到ck集群过程中,对mysql表做insert操作,会在ck中执行两次;例如在mysql表中插入了’aaaaa‘这条数据,ck中会存在两条‘aaaaa’(sink.batch-size已经设置为1)。如果是ck单点同步则没有出现这样的问题 请问flink连接ck集群配置有问题吗,配置如下: CREATE TABLE supplier_order_out_info_sink ( ......... ) with ( 'connector' = 'clickhouse', 'url' = 'clickhouse://111:8123,222:8123,333:8123', 'database-name' = 'db', 'table-name' = 'local_tb', 'use-local' = 'true', 'sink.flush-interval' = '5s', 'sink.batch-size' = '1', 'sink.max-retries' = '20', 'sink.partition-strategy' = 'hash', 'sink.partition-key' = 'id', 'sink.ignore-delete' = 'false', 'sink.parallelism' = '1', 'sink.update-strategy' = 'insert', 'catalog.ignore-primary-key' = 'false', 'username' = 'xxx', 'password' = 'xxxx' ); 2)、flink插入ck单点和插入ck集群有什么需要注意的地方吗? 期待您的解答🙏

itinycheng commented 1 year ago

@jieshier 'sink.update-strategy' = 'insert'的作用是将update_after数据解析为insert语句,这个配置一般配合MergeTree使用;你的问题可以和这个有关系; ck单点同步不太清楚指什么; 不太清楚你的使用场景,connector只是接收上游数据,然后写clickhouse,并没有重复插入的代码逻辑,可以先确认下source端到底下发了几条数据;

jieshier commented 1 year ago

@itinycheng 您好,我了解了并测试了'sink.update-strategy' = 'insert'这个参数的作用,加与不加这个参数我都存在上面👆说的问题;所以应该跟这个参数没多大关系。 sink.batch-size已经设置为1前提下: 我现在的使用场景是 mysql -- flink -- ck集群,在往ck集群全量同步数据过程中如果有insert语句、则会在ck集群里执行两遍(就是说在ck中会存在两条一摸一样的数据)。 还有,请问一下: 'table-name' = 'local_tb' 这个配置我填分布式表,'use-local' = 'true' 这个配置我填true;是不是实际上还是写的本地表。

itinycheng commented 1 year ago

@itinycheng 您好,我了解了并测试了'sink.update-strategy' = 'insert'这个参数的作用,加与不加这个参数我都存在上面👆说的问题;所以应该跟这个参数没多大关系。 sink.batch-size已经设置为1前提下: 我现在的使用场景是 mysql -- flink -- ck集群,在往ck集群全量同步数据过程中如果有insert语句、则会在ck集群里执行两遍(就是说在ck中会存在两条一摸一样的数据)。 还有,请问一下: 'table-name' = 'local_tb' 这个配置我填分布式表,'use-local' = 'true' 这个配置我填true;是不是实际上还是写的本地表。

  1. 将FlinkSQL改为Select语句,打印查询结果,看是否有重复数据问题,clickhouse connector不应该有你说的将数据写两遍的问题;
  2. 正确的使用方法就是 table-name 配置为分布式表,use-local = true会将分布式表转为本地表,后续操作都是针对本地表进行的;
  3. README.md里有ClickHouseCatalog的使用说明,可以直接create catalog,然后FlinkSQL直接用catalog.database.table即可;