Altinity / clickhouse-sink-connector

Replicate data from MySQL, Postgres and MongoDB to ClickHouse
https://www.altinity.com
Apache License 2.0
215 stars 48 forks source link

Data Not Replicating from PostgreSQL Partitioned Tables to ClickHouse After Initial Load #513

Closed PraveenBalaji001 closed 5 months ago

PraveenBalaji001 commented 5 months ago

Hi,

While using the sink connector for data replication from PostgreSQL to ClickHouse, existing records are successfully replicated from PostgreSQL to ClickHouse. However, after the initial load, when new data is inserted into the PostgreSQL table while the application is running, that data is not inserted into ClickHouse. This issue only occurs for partitioned tables, while for non-partitioned tables the instant data inserts are working successfully after the existing data replication. Configuration details and the table schema have been provided for reference.

config.yaml -

database.hostname: "XXXX" database.port: "XXXX" database.user: "XXXX" database.password: "XXXX"

schema.include.list: public plugin.name: "pgoutput" table.include.list: "public.psp_transactions"

clickhouse.server.url: "XXXX" clickhouse.server.user: "XXXX" clickhouse.server.password: "XXXX" clickhouse.server.port: "XXXX" clickhouse.server.database: "upiswitch"

database.allowPublicKeyRetrieval: "true" snapshot.mode: "initial" offset.flush.interval.ms: 5000

connector.class: "io.debezium.connector.postgresql.PostgresConnector" offset.storage: "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore" offset.storage.jdbc.offset.table.name: "altinity_sink_connector.replica_source_info" offset.storage.jdbc.url: "XXXX" offset.storage.jdbc.user: "adminupi" offset.storage.jdbc.password: "XXXX" offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s ( id String, offset_key String, offset_val String, record_insert_ts DateTime, record_insert_seq UInt64, _version UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9)) ) ENGINE = ReplacingMergeTree(_version) ORDER BY offset_key SETTINGS index_granularity = 8198" offset.storage.jdbc.offset.table.delete: "delete from %s where 1=1" schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory" schema.history.internal.jdbc.url: "XXXX" schema.history.internal.jdbc.user: "XXXX" offset.storage.jdbc.password: "XXXX" schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exists %s (id VARCHAR(36) NOT NULL, history_data VARCHAR(65000), history_data_seq INTEGER, record_insert_ts TIMESTAMP NOT NULL, record_insert_seq INTEGER NOT NULL) ENGINE=ReplacingMergeTree(record_insert_seq) order by id" schema.history.internal.jdbc.schema.history.table.name: "altinity_sink_connector.replicate_schema_history" enable.snapshot.ddl: "true" database.dbname: "upiswitch"

Table Schema in PostgresSQL -

CREATE TABLE IF NOT EXISTS psp_transactions ( id BIGINT NOT NULL, institution_id VARCHAR(16) NOT NULL, upi_transaction_id TEXT NOT NULL, msg_id TEXT NOT NULL, platform_id TEXT NOT NULL, operation CHAR(2), --DR/CR currency_code VARCHAR(4), amount NUMERIC(18,2), vpa TEXT, "type" TEXT, "code" TEXT, account_number TEXT NOT NULL, account_type TEXT, mobile_number TEXT, a_mobile_number TEXT, account_ifsc TEXT, "name" TEXT, identity_details TEXT, approval_number TEXT, other_vpa TEXT, other_type TEXT, other_code TEXT, other_account_number TEXT, other_account_type TEXT, other_mobile_number TEXT, other_a_mobile_number TEXT, other_account_ifsc TEXT, other_name TEXT, other_identity_details TEXT, other_approval_number TEXT, original_upi_transaction_id TEXT, transaction_date DATE NOT NULL, request_time TIMESTAMP WITH TIME ZONE, response_time TIMESTAMP WITH TIME ZONE, cbs_response_code VARCHAR(18), cbs_response_msg TEXT, cbs_reference_number TEXT, cbs_connect_reference_number TEXT, upi_response_code VARCHAR(8), upi_response_message TEXT, status TEXT NOT NULL, upi_confirmation_status TEXT, cbs_extract TEXT, is_reversal BOOLEAN DEFAULT FALSE, is_pre_approved BOOLEAN DEFAULT FALSE, transaction_notes TEXT, merchant_category_code TEXT, reference_id TEXT, customer_reference_id TEXT, transaction_type VARCHAR(16), transaction_sub_type VARCHAR(16), initiation_mode VARCHAR(3), purpose VARCHAR(3), ref_url ref_category VARCHAR(3), upi_error_code TEXT, udir_complaints_id BIGINT, complaints_reference_number TEXT, merchant_details TEXT, device_details TEXT, other_device_details TEXT, lrn TEXT, version SMALLINT DEFAULT 1, created_time TIMESTAMP WITH TIME ZONE DEFAULT NOW(), last_modified_time TIMESTAMP WITH TIME ZONE ) PARTITION BY RANGE(transaction_date);

Table Schema in ClickHouse -

CREATE TABLE upiswitch.psp_transactions ( id Int64, institution_id String, upi_transaction_id String, msg_id String, platform_id String, operation Nullable(String), currency_code Nullable(String), amount Nullable(Decimal(18, 2)), vpa String, type Nullable(String), code Nullable(String), account_number String, account_type Nullable(String), mobile_number String, a_mobile_number String, account_ifsc Nullable(String), name Nullable(String), identity_details Nullable(String), approval_number Nullable(String), other_vpa Nullable(String), other_type Nullable(String), other_code Nullable(String), other_account_number Nullable(String), other_account_type Nullable(String), other_mobile_number Nullable(String), other_a_mobile_number Nullable(String), other_account_ifsc Nullable(String), other_name Nullable(String), other_identity_details Nullable(String), other_approval_number Nullable(String), original_upi_transaction_id Nullable(String), transaction_date Date, request_time Nullable(DateTime64(6)), response_time Nullable(DateTime64(6)), cbs_response_code Nullable(String), cbs_response_msg Nullable(String), cbs_reference_number Nullable(String), cbs_connect_reference_number Nullable(String), upi_response_code Nullable(String), upi_response_message Nullable(String), status String, upi_confirmation_status Nullable(String), cbs_extract Nullable(String), is_reversal Nullable(Bool), is_pre_approved Nullable(Bool), transaction_notes Nullable(String), merchant_category_code Nullable(String), reference_id Nullable(String), customer_reference_id String, transaction_type LowCardinality(String), transaction_sub_type Nullable(String), initiation_mode LowCardinality(String), ref_url Nullable(String), ref_category Nullable(String), upi_error_code Nullable(String), udir_complaints_id Nullable(Int64), complaints_reference_number Nullable(String), merchant_details Nullable(String), device_details Nullable(String), other_device_details Nullable(String), version Int16, created_time DateTime64(3), last_modified_time Nullable(DateTime64(3)), purpose Nullable(String), lrn Nullable(String), original_customer_reference_id Nullable(String) ) ENGINE = ReplacingMergeTree(version) PARTITION BY toYYYYMM(transaction_date) ORDER BY (a_mobile_number, account_number, vpa, transaction_type, transaction_date, id) SETTINGS index_granularity = 8192

aadant commented 5 months ago

@subkanthi not the first time we have issues with partitioned tables and Postgres. Anything to document ? see also https://github.com/Altinity/clickhouse-sink-connector/issues/458

subkanthi commented 5 months ago

@PraveenBalaji001 , its similar to this https://issues.redhat.com/browse/DBZ-4433 table.include.list does not include the partitioned tables, you should use a regex to include partitioned tables

table.include.list=<schema_name>.<table_name>_.*

You will see in the logs if the table is included in the list of captured tables.

PraveenBalaji001 commented 5 months ago

Thanks for the reply @subkanthi, its working...

subkanthi commented 5 months ago

closing this for now, please feel free re-open if u face additional issues.