Altinity / clickhouse-sink-connector

Replicate data from MySQL, Postgres and MongoDB to ClickHouse®
https://www.altinity.com
Apache License 2.0
233 stars 54 forks source link

Data partially replicated from PostgreSQL #300

Open detygon opened 1 year ago

detygon commented 1 year ago

I'm trying to set up a streaming data ingestion between Postgres and Clickhouse the process works well but data is replicated partially to Clickhouse. I get this error in the logs but I don't know whether it is related or not.

debezium-embedded_1 | 585341 2023-08-27 23:05:04.194 [main] ERROR io.debezium.embedded.EmbeddedEngine - Timed out waiting to flush EmbeddedEngine{id=debezium-embedded-postgres} offsets to storage

Here is my config:

name: "debezium-embedded-postgres"
database.dbname: "analytics"
database.hostname: "xxx"
database.port: "5432"
database.user: "xxx"
database.password: "xxx"
database.server.name: "eattestation.analytics"
schema.include.list: public
plugin.name: "pgoutput"
table.include.list: "public.xxx"
clickhouse.server.url: "https://xxx.clickhouse.cloud"
clickhouse.server.user: "xxx"
clickhouse.server.pass: "xxx"
clickhouse.server.port: "8443"
clickhouse.server.database: "analytics"
database.allowPublicKeyRetrieval: "true"
snapshot.mode: "initial"
offset.flush.interval.ms: 5000
connector.class: "io.debezium.connector.postgresql.PostgresConnector"
offset.storage: "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore"
offset.storage.offset.storage.jdbc.offset.table.name: "analytics.replica_source_info"
offset.storage.jdbc.url: "jdbc:clickhouse:http://xxx.clickhouse.cloud:8443/analytics?ssl=true&sslmode=STRICT"
offset.storage.jdbc.user: "xxx"
offset.storage.jdbc.password: "xxx"
offset.storage.offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s
(
    `id` String,
    `offset_key` String,
    `offset_val` String,
    `record_insert_ts` DateTime,
    `record_insert_seq` UInt64,
    `_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9))
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY id
SETTINGS index_granularity = 8198"
offset.storage.offset.storage.jdbc.offset.table.delete: "delete from %s where 1=1"
schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory"
schema.history.internal.jdbc.url: "jdbc:clickhouse:http://xxx.clickhouse.cloud:8443/analytics?ssl=true&sslmode=STRICT"
schema.history.internal.jdbc.user: "xxx"
schema.history.internal.jdbc.password: "xxx"
schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exists %s
(`id` VARCHAR(36) NOT NULL, `history_data` VARCHAR(65000), `history_data_seq` INTEGER, `record_insert_ts` TIMESTAMP NOT NULL, `record_insert_seq` INTEGER NOT NULL) ENGINE=ReplacingMergeTree(record_insert_seq) order by id"

schema.history.internal.jdbc.schema.history.table.name: "analytics.replicate_schema_history"
enable.snapshot.ddl: "true"
auto.create.tables: "true"
subkanthi commented 1 year ago

Hi, can you please share the full logs and explain partial, replacingmergetree merges the rows based on the order by columns