Altinity / clickhouse-sink-connector

Replicate data from MySQL, Postgres and MongoDB to ClickHouse®
https://www.altinity.com
Apache License 2.0
234 stars 54 forks source link

Make Connector ready to switch to another Replica #354

Open BorisTyshkevich opened 1 year ago

BorisTyshkevich commented 1 year ago

Sink Connector connects to only one node at a time even for Clickhouse Cluster. In case of Replica fail we couldn't continue consuming events from the OLTP database as position and history stay on the unavailable server. We need to move internal connector tables to be cluster-wide.

For Offset I suggest KeeperMap for History - ReplicatedMergeTree. Here is the example:

offset.storage: "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore"
offset.storage.jdbc.offset.table.name: "system.asc_offsets"
offset.storage.jdbc.url: "jdbc:clickhouse://clickhouse/?ssl=true"
offset.storage.jdbc.user: "asc"
offset.storage.jdbc.password: "***"
offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s
(
    `id` String,
    `offset_key` String,
    `offset_val` String,
    `record_insert_ts` DateTime,
    `record_insert_seq` UInt64,
) ENGINE =  KeeperMap('/asc_offsets4', 4)
PRIMARY KEY offset_key"
offset.storage.jdbc.offset.table.delete: "select 1"
schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory"
schema.history.internal.jdbc.url: "jdbc:clickhouse://clickhouse/?ssl=true"
schema.history.internal.jdbc.user: "asc"
schema.history.internal.jdbc.password: "***"
schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exists %s
(
   `id` FixedString(36),
   `history_data` String,
   `history_data_seq` UInt32,
   `record_insert_ts` DateTime,
   `record_insert_seq` UInt32
) ENGINE=ReplacingMergeTree(record_insert_seq)
order by id"
schema.history.internal.jdbc.schema.history.table.name: "system.asc_schema_history"
aadant commented 1 year ago

@BorisTyshkevich why not ReplicatedMergeTree the offset table ?

BorisTyshkevich commented 1 year ago

Replication could be lagged and it produce more duplicates. KeeperMap was introduced specially for such data as binlog position and works quite good.

It's even possible to make exactly one delivery by doing a two-stage commit with the next and last position.

aadant commented 1 year ago

Please note that it does not matter if the position is lagging, re-inserting to a replacing merge tree does not matter (few seconds lag is not a big deal)

BorisTyshkevich commented 1 year ago

RMT is not the best instrument to deal with duplicates. Not only FINAL speed is an issue. You can't set up aggregating MV to the table with duplicates. That's why exactly-once-delivery is better. The right thing is to send to Clickhouse the very same block after the connector restart to make checksum block deduplication work. That requires having both start and end positions for every insert block.

aadant commented 1 year ago

I am open to discussion but that's a design choice to use RMT. Final is not an issue for this use case, we should document the recommended settings.

aadant commented 1 year ago

@BorisTyshkevich please note that sink != sync.