Open breeze1126 opened 3 years ago
clickhouse不适合更新的,一般都是全量插入代替更新。 嗯嗯。好。 现在有个需求,就是通过flink-sql入clickhouse之后的数据 就像入PG那样,实现更新。 这个目前没有要求要实时更新。可以批量更新。 另外:这个全量插入代替更新,有通过flink-connector-clickhouse实现吗?
PG我不太了解,我这边涉及到更新的主要用两种方式,但不会涉及到某一个字段的更新,一个是离线数据的输出结果每天调度都全量插入到ck中,查询会查询这份最新的数据,不考虑哪条数据去更新;另外一种比如一个userid每次来的数据都会给插到ck里,对应的时间戳不一样,跟hbase维护多个版本的数据差不多,查的时候查询这个userid对应时间戳最新的数据。 我们这边是把数据全部保留的,如果就想只留一条最新的数据可以看下VersionedCollapsingMergeTree引擎
嗯嗯。查看了CK的官方说明,VersionedCollapsingMergeTree引擎确实能够解决我现在遇到的问题。但是这个我不知道怎么样通过flink-connector-clickhouse去实现。
这跟connector没什么关系,设计好表结构,控制你插入的数据VersionedCollapsingMergeTree(sign, version)中括号里面两个参数,连接器支持insert就行了
嗯嗯 。我测试的是通过flink-cdc的方式写入到CK。 但是在sql-client里面建表的时候,发现不支持数据类型为整型的,一旦插入数据为整型数据,就被判定为布尔类型了。如果想通过merger into的方式将数据提前进行处理,这样又受源端的数据库类型限制。例如:一旦源端变成MYSQL,merger into的语法就限制了。所以不知道有没有更好的解决方案了。 下面是我测试的语句: CREATE TABLE products_VersionedCollapsingMergeTree( id String, name String, description String, sign Int8, ver UInt8 ) ENGINE = VersionedCollapsingMergeTree(sign,ver) PARTITION BY (id) ORDER BY id;
DROP TABLE IF EXISTS sink_products; CREATE TABLE sink_products ( id INT, name String, description String, sign Int8, ver UInt8 ) WITH ( 'connector' = 'clickhouse', 'url' = 'clickhouse://10.20.101.34:8123', 'table-name' = 'products_VersionedCollapsingMergeTree', 'username' = 'default', 'password' = 'Hadoo****', 'database-name' = 'default', );
你试下把flink的表字段类型都改成STRING试试,之前我遇到过数据类型转换的错误这样改的。通过flink-cdc的方式写入到CK我没用过,这种更新的数据用connector确实是有点麻烦,我这边都是从kafaka接进来的。新版的ck有个MaterializeMySQL引擎同步mysql数据你试过没有?
嗯嗯,string类型转换,我去尝试验证一下。这个新引擎我也去做下测试。多谢指导释疑了。
clickhouse不适合update的,通常都是全量insert代替update。