Altinity / clickhouse-sink-connector

Replicate data from MySQL, Postgres and MongoDB to ClickHouse®
https://www.altinity.com
Apache License 2.0
234 stars 54 forks source link

MySQL db not able to sync fully, it fails snapshotting after 25M, syncs the table over and over again #722

Open Lokesh14120 opened 3 months ago

Lokesh14120 commented 3 months ago

The debizium connector fails periodically and starts syncing everything again

This is the config file we set up https://gist.github.com/Lokesh14120/dd38d7cc7d07d61576eff318c2d77384

System specifications Cpu :- 8 core Ram :- 16 gb

This is the execution entry point "sh", "-c", "java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005 -XX:ParallelGCThreads=4 -Xms6g -Xmx6g -Dlog4j2.configurationFile=log4j2.xml -jar /app.jar /config.yml com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication"

These are the errors we encountered.


[debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.pipeline.ChangeEventSourceCoordinator - Context created
[debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.connector.mysql.MySqlSnapshotChangeEventSource - No previous offset has been found
[debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.connector.mysql.MySqlSnapshotChangeEventSource - According to the connector configuration both schema and data will be snapshotted
[debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 1 - Preparing
[debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Previous snapshot was cancelled before completion; a new snapshot will be taken.
[debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 2 - Determining captured tables
[debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.connector.mysql.MySqlSnapshotChangeEventSource - Read list of available databases
[debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.connector.mysql.MySqlSnapshotChangeEventSource -    list of available databases is: [dnsbl_checks, experimental, information_schema, mysql, performance_schema, ri_mailer, ri_mailers, sys]
[debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.connector.mysql.MySqlSnapshotChangeEventSource - Read list of available tables in each database
[debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.connector.mysql.MySqlSnapshotChangeEventSource -   snapshot continuing with database(s): [ri_mailer]
[debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Adding table ri_mailer.custom_domain to the list of capture schema tables
[debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Adding table ri_mailer.webhook_logs to the list of capture schema tables```

```[debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] ERROR io.debezium.relational.RelationalSnapshotChangeEventSource - Error during snapshot
java.lang.InterruptedException
        at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1638)
        at java.base/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:435)
        at java.base/java.util.concurrent.ExecutorCompletionService.take(ExecutorCompletionService.java:200)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:467)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:165)
        at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:92)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:253)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:237)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:189)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:137)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
[Sink connector Debezium Event Thread] INFO io.debezium.pipeline.signal.SignalProcessor - SignalProcessor stopped```

```[debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] ERROR io.debezium.relational.RelationalSnapshotChangeEventSource - Error during snapshot
java.lang.InterruptedException
        at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1638)
        at java.base/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:435)
        at java.base/java.util.concurrent.ExecutorCompletionService.take(ExecutorCompletionService.java:200)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:467)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:165)
        at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:92)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:253)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:237)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:189)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:137)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
[Sink connector Debezium Event Thread] INFO io.debezium.pipeline.signal.SignalProcessor - SignalProcessor stopped```

Is there something we are missing in the config
subkanthi commented 3 months ago

It looks like it could be related to acquiring locks, you can try this configuration to disable snapshot locking.

snapshot.locking.mode= none https://stackoverflow.com/questions/75077687/debezium-mysql-connector-timing-out-during-initial-snapshot

Lokesh14120 commented 3 months ago

We actually tried snapshot.locking.mode= none only

Lokesh14120 commented 3 months ago

This was the config file https://gist.github.com/Lokesh14120/dd38d7cc7d07d61576eff318c2d77384

aadant commented 2 months ago

@Lokesh14120 : see the above issue. It is a side effect of automatic restart.

Mitrajit commented 2 months ago

it should have resumed processing data from the saved offset. If this feature isn't available yet, it should be considered for future implementation.

We ended up abandoning snapshotting entirely and relied on snapshot.mode: "schema_only" to update data from the binlogs. Additionally, we wrote a script/SQL to backfill data into the ReplacingMergeTree from the MySQL engine.

aadant commented 2 months ago

@Mitrajit for MySQL, you can also use this https://github.com/Altinity/clickhouse-sink-connector/blob/develop/sink-connector/python/README.md This works well with snapshot.mode: "schema_only".