itinycheng / flink-connector-clickhouse

Flink SQL connector for ClickHouse. Support ClickHouseCatalog and read/write primary data, maps, arrays to clickhouse.
Apache License 2.0
375 stars 156 forks source link

Unexpected additional rows in ClickHouse sink table for flink CDC procedure #50

Closed GlockGao closed 1 year ago

GlockGao commented 2 years ago

Flink Version : 1.13.2 Flink CDC version: 2.2 flink-connector-clickhouse version : 1.13-release

  1. The workflow as below: MySQL (8.0.28) -> Flink CDC -> Flink -> ClickHouse.

  2. (1) Configuration : Use local table (engine is MergeTree) for clickhouse sink table, configuration as below: _" 'connector' = 'clickhouse'," + " 'url' = 'clickhouse://clickhouse-server:8123',"+ " 'username' = 'user'"+ " 'password' = 'pass'"+ " 'database-name' = 'database'" + " 'table-name' = 'clickhouse_sinktable'" + " 'sink.batch-size' = '500'" + " 'sink.flush-interval' = '1000'" + " 'sink.max-retries' = '3'" + (2) Relevant Flink Java SQL code: String sql = "INSERT INTO clickhouse_sink_table SELECT vrfi.*, DATE_FORMAT(vrfi.created, 'yyyy-MM') FROM mysql_source_table as vrfi\n"; statementSet.addInsertSql(sql);

  3. The table in MySQL has '539905' rows: select count(id) from sbtest1; +-----------+ | count(id) | +-----------+ | 539905 | +-----------+ 1 row in set (0.05 sec)

  4. Rows for ClickHouse sink table: _SELECT COUNT(id) FROM clickhouse_sink_table;

SELECT COUNT(id) FROM clickhouse_sink_table

Query id: 775dd633-9a8b-4c13-b272-3b317f0575ab

┌─count(id)─┐ │ 556114 │ └───────────┘_

  1. But if use 'distinct' function the results as expected: _SELECT COUNT(distinct(id)) FROM clickhouse_sink_table;

SELECT COUNTDistinct(id) FROM clickhouse_sink_table

Query id: 3e6dc971-47e0-4ca0-836b-eaabf280d2e2

┌─uniqExact(id)─┐ │ 539905 │ └───────────────┘_

  1. I found similar issue: https://stackoverflow.com/questions/53442559/how-to-avoid-duplicates-in-clickhouse-table. Does it help if I change engine to 'replicated merge tree' or 'replacing merge tree' ?

Thanks for your help.

itinycheng commented 2 years ago

@GlockGao

  1. This seems to be related to FlinkCDC(generate new records like UPDATE_BEFORE / UPDATE_AFTER), The flink-connector-clickhouse cannot generate any new record.
  2. Distinct operator removes duplicate records with the same primary key in flink state; therefore, the records written to clickhouse have been duplicated.
  3. If you want to remove duplicate records, use replacing merge tree, refer to: https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree