Altinity / clickhouse-sink-connector

Replicate data from MySQL, Postgres and MongoDB to ClickHouse
https://www.altinity.com
Apache License 2.0
196 stars 48 forks source link

Error during snapshot #654

Open sarthaksingh-tomar opened 1 week ago

sarthaksingh-tomar commented 1 week ago

Hello,

I am trying clickhouse-sink-connector-lightweight to replicate data from Mariadb to clickhouse but it is failing with this exception during snapshot.

using default config with below mariadb configs.

# Mariadb specific properties
connector.adapter: "mariadb"
database.protocol: "jdbc:mariadb"
database.jdbc.driver: "org.mariadb.jdbc.Driver"

https://github.com/Altinity/clickhouse-sink-connector/blob/develop/sink-connector-lightweight/docker/config.yml

**[Sink Connector thread-pool-5] ERROR io.debezium.embedded.EmbeddedEngine - Timed out waiting to flush EmbeddedEngine{id=connector-shard18} offsets to storage**

[debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] ERROR io.debezium.relational.RelationalSnapshotChangeEventSource - Error during snapshot
**java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Snapshotting of table db.table1 failed**
    at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
    at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:467)
    at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:165)
    at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:92)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:253)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:237)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:189)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:137)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.ConnectException: Snapshotting of table db.table1 failed
    at io.debezium.relational.RelationalSnapshotChangeEventSource.doCreateDataEventsForTable(RelationalSnapshotChangeEventSource.java:597)
    at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createDataEventsForTableCallable$6(RelationalSnapshotChangeEventSource.java:519)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    ... 5 more
**Caused by: java.sql.SQLNonTransientConnectionException: (conn=272359) Error while streaming resultSet data**
    at org.mariadb.jdbc.export.ExceptionFactory.createException(ExceptionFactory.java:293)
    at org.mariadb.jdbc.export.ExceptionFactory.create(ExceptionFactory.java:359)
    at org.mariadb.jdbc.client.result.StreamingResult.addStreamingValue(StreamingResult.java:130)
    at org.mariadb.jdbc.client.result.StreamingResult.nextStreamingValue(StreamingResult.java:113)
    at org.mariadb.jdbc.client.result.StreamingResult.next(StreamingResult.java:163)
    at io.debezium.jdbc.CancellableResultSet.next(CancellableResultSet.java:52)
    at io.debezium.relational.RelationalSnapshotChangeEventSource.doCreateDataEventsForTable(RelationalSnapshotChangeEventSource.java:576)
    ... 7 more
    Suppressed: java.sql.SQLNonTransientConnectionException: (conn=272359) Error while streaming resultSet data
        at org.mariadb.jdbc.export.ExceptionFactory.createException(ExceptionFactory.java:293)
        at org.mariadb.jdbc.export.ExceptionFactory.create(ExceptionFactory.java:359)
        at org.mariadb.jdbc.client.result.Result.close(Result.java:307)
        at io.debezium.jdbc.CancellableResultSet.close(CancellableResultSet.java:65)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.doCreateDataEventsForTable(RelationalSnapshotChangeEventSource.java:545)
        ... 7 more
    Caused by: java.net.SocketException: Connection reset
        at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:325)
        at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
        at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
        at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
        at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
        at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
        at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:343)
        at org.mariadb.jdbc.client.socket.impl.PacketReader.readPacket(PacketReader.java:76)
        at org.mariadb.jdbc.client.result.Result.skipRemaining(Result.java:210)
        at org.mariadb.jdbc.client.result.Result.close(Result.java:305)
        ... 9 more
    Suppressed: java.sql.SQLNonTransientConnectionException: (conn=272359) Error while streaming resultSet data
        at org.mariadb.jdbc.export.ExceptionFactory.createException(ExceptionFactory.java:293)
        at org.mariadb.jdbc.export.ExceptionFactory.create(ExceptionFactory.java:359)
        at org.mariadb.jdbc.client.result.StreamingResult.addStreamingValue(StreamingResult.java:130)
        at org.mariadb.jdbc.client.result.StreamingResult.fetchRemaining(StreamingResult.java:145)
        at org.mariadb.jdbc.client.result.Result.closeFromStmtClose(Result.java:325)
        at org.mariadb.jdbc.Statement.close(Statement.java:175)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.doCreateDataEventsForTable(RelationalSnapshotChangeEventSource.java:545)
        ... 7 more
    Caused by: java.net.SocketException: Connection reset
        at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:325)
        at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
        at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
        at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
        at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
        at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
        at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:343)
        at org.mariadb.jdbc.client.socket.impl.PacketReader.readPacket(PacketReader.java:76)
        at org.mariadb.jdbc.client.result.Result.readNext(Result.java:155)
        at org.mariadb.jdbc.client.result.StreamingResult.addStreamingValue(StreamingResult.java:124)
        ... 11 more
subkanthi commented 1 week ago

This could be related to acquiring table locks, u can try this debezium configuration

snapshot.locking.mode= none 
sarthaksingh-tomar commented 5 days ago

This could be related to acquiring table locks, u can try this debezium configuration

snapshot.locking.mode= none 

@subkanthi Already using that to avoid locking but still connector is failing. connector parameter configs


database.allowPublicKeyRetrieval: "true"

snapshot.mode: Debezium can use different modes when it runs a snapshot. The snapshot mode is determined by the snapshot.mode configuration property.

The default value of the property is initial. You can customize the way that the connector creates snapshots by changing the value of the snapshot.mode property

snapshot.mode: "when_needed" snapshot.locking.mode: "none"

snapshot.fetch.size: "15000"

offset.flush.interval.ms: The number of milliseconds to wait before flushing recent offsets to Kafka. This ensures that offsets are committed within the specified time interval.

offset.flush.interval.ms: 5000

connector.class: The Java class for the connector. This must be set to io.debezium.connector.mysql.MySqlConnector.

connector.class: "io.debezium.connector.mysql.MySqlConnector"

offset.storage: The Java class that implements the offset storage strategy. This must be set to io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore.

offset.storage: "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore"

offset.storage.jdbc.offset.table.name: The name of the database table where connector offsets are to be stored.

offset.storage.jdbc.offset.table.name: "altinity_sink_connector.replica_source_info"

offset.storage.jdbc.url: The JDBC URL for the database where connector offsets are to be stored.

offset.storage.jdbc.url: "jdbc:clickhouse://#########:8123/altinity_sink_connector"

offset.storage.jdbc.user: The name of the database user to be used when connecting to the database where connector offsets are to be stored.

offset.storage.jdbc.user: "#####"

offset.storage.jdbc.password: The password of the database user to be used when connecting to the database where connector offsets are to be stored.

offset.storage.jdbc.password: "#######"

offset.storage.jdbc.offset.table.ddl: The DDL statement used to create the database table where connector offsets are to be stored.(Advanced)

offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists altinity_sink_connector.replica_source_info ( id String, offset_key String, offset_val String, record_insert_ts DateTime, record_insert_seq UInt64, _version UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9)) ) ENGINE = ReplacingMergeTree(_version) ORDER BY offset_key SETTINGS index_granularity = 8192"

offset.storage.jdbc.offset.table.delete: The DML statement used to delete the database table where connector offsets are to be stored.(Advanced)

offset.storage.jdbc.offset.table.delete: "select * from altinity_sink_connector.replica_source_info"

offset.storage.jdbc.offset.table.select: "SELECT id, offset_key, offset_val FROM altinity_sink_connector.replica_source_info FINAL ORDER BY record_insert_ts, record_insert_seq"

schema.history.internal: The Java class that implements the schema history strategy. This must be set to io.debezium.storage.jdbc.history.JdbcSchemaHistory.

schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory"

schema.history.internal.jdbc.schema.history.table.name: The name of the database table where connector schema history is to be stored.

schema.history.internal.jdbc.schema.history.table.name: "altinity_sink_connector.replicate_schema_history" schema.history.internal.schema.history.table.name: "altinity_sink_connector.replicate_schema_history"

schema.history.internal.jdbc.url: The JDBC URL for the database where connector schema history is to be stored.

schema.history.internal.jdbc.url: "jdbc:clickhouse://########:8123/altinity_sink_connector"

schema.history.internal.jdbc.user: The name of the database user to be used when connecting to the database where connector schema history is to be stored.

schema.history.internal.jdbc.user: "#####"

schema.history.internal.jdbc.password: The password of the database user to be used when connecting to the database where connector schema history is to be stored.

schema.history.internal.jdbc.password: "########"

schema.history.internal.jdbc.schema.history.table.ddl: The DDL statement used to create the database table where connector schema history is to be stored.(Advanced)

schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exists altinity_sink_connector.replicate_schema_history (id VARCHAR(36) NOT NULL, history_data VARCHAR(65000), history_data_seq INTEGER, record_insert_ts TIMESTAMP NOT NULL, record_insert_seq INTEGER NOT NULL) ENGINE=ReplacingMergeTree(record_insert_seq) order by id"

enable.snapshot.ddl: If set to true, the connector will parse the DDL statements from the initial load

enable.snapshot.ddl: "true"

persist.raw.bytes: If set to true, the connector will persist raw bytes as received in a String column.

persist.raw.bytes: "false"

auto.create.tables: If set to true, the connector will create tables in the target based on the schema received in the incoming message.

auto.create.tables: "true"

auto.create.tables.replicated: If set to true, the connector will create table with Engine set to ReplicatedReplacingMergeTree

"auto.create.tables.replicated: "true"

database.connectionTimeZone: The timezone of the MySQL database server used to correctly shift the commit transaction timestamp.

database.connectionTimeZone: "UTC"

clickhouse.datetime.timezone: This timezone will override the default timezone of ClickHouse server. Timezone columns will be set to this timezone.

clickhouse.datetime.timezone: "UTC"

skip_replica_start: If set to true, the connector will skip replication on startup. sink-connector-client start_replica will start replication.

skip_replica_start: "false"

binary.handling.mode: The mode for handling binary values. Possible values are bytes, base64, and decode. The default is bytes.

binary.handling.mode: "base64"

ignore_delete: If set to true, the connector will ignore delete events. The default is false.

ignore_delete: "true"

disable.ddl: If set to true, the connector will ignore DDL events. The default is false.

disable.ddl: "false"

disable.drop.truncate: If set to true, the connector will ignore drop and truncate events. The default is false.

disable.drop.truncate: "false"

restart.event.loop: This will restart the CDC event loop if there are no messages received after timeout specified in restart.event.loop.timeout.period.secs

restart.event.loop: "false"

restart.event.loop.timeout.period.secs: Defines the restart timeout period.

restart.event.loop.timeout.period.secs: "3000"

Max number of records for the flush buffer.

buffer.max.records: "10000"