confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
19 stars 955 forks source link

Jdbc-sink connector schema-changes topic capturing #1328

Open harshvardhany opened 1 year ago

harshvardhany commented 1 year ago

I'm trying to create a pipeline from Mysql to Mysql databases using debezium source connector,jdbc-sink connector.

My debezium source connector configuration:

{ "name":"debezium_mysql", "config" : {

    "connector.class":"io.debezium.connector.mysql.MySqlConnector",

    "database.hostname":"mysql",
    "database.port":"3306",
    "database.user":"root",
    "database.password":"debezium",
    "database.server.id":"1167",
    "database.server.name":"source-mysql",
    "topic.prefix":"cdc",

    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.mysql",

    "tombstones.on.delete":"true",
    "connect.keep.alive":"true",

    "database.include.list":"source",         
    "message.key.columns":"source.student:id",

    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter"

}

}

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":true,"field":"databaseName"},{"type":"string","optional":true,"field":"schemaName"},{"type":"string","optional":true,"field":"ddl"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"field":"id"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"defaultCharsetName"},{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"primaryKeyColumnNames"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int32","optional":false,"field":"jdbcType"},{"type":"int32","optional":true,"field":"nativeType"},{"type":"string","optional":false,"field":"typeName"},{"type":"string","optional":true,"field":"typeExpression"},{"type":"string","optional":true,"field":"charsetName"},{"type":"int32","optional":true,"field":"length"},{"type":"int32","optional":true,"field":"scale"},{"type":"int32","optional":false,"field":"position"},{"type":"boolean","optional":true,"field":"optional"},{"type":"boolean","optional":true,"field":"autoIncremented"},{"type":"boolean","optional":true,"field":"generated"},{"type":"string","optional":true,"field":"comment"}],"optional":false,"name":"io.debezium.connector.schema.Column"},"optional":false,"field":"columns"},{"type":"string","optional":true,"field":"comment"}],"optional":false,"name":"io.debezium.connector.schema.Table","field":"table"}],"optional":false,"name":"io.debezium.connector.schema.Change"},"optional":false,"field":"tableChanges"}],"optional":false,"name":"io.debezium.connector.mysql.SchemaChangeValue"},"payload":{"source":{"version":"1.8.1.Final","connector":"mysql","name":"source-mysql","ts_ms":1680764865377,"snapshot":"true","db":"source","sequence":null,"table":null,"server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":871,"row":0,"thread":null,"query":null},"databaseName":"source","schemaName":null,"ddl":"USE source","tableChanges":[]}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":true,"field":"databaseName"},{"type":"string","optional":true,"field":"schemaName"},{"type":"string","optional":true,"field":"ddl"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"field":"id"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"defaultCharsetName"},{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"primaryKeyColumnNames"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int32","optional":false,"field":"jdbcType"},{"type":"int32","optional":true,"field":"nativeType"},{"type":"string","optional":false,"field":"typeName"},{"type":"string","optional":true,"field":"typeExpression"},{"type":"string","optional":true,"field":"charsetName"},{"type":"int32","optional":true,"field":"length"},{"type":"int32","optional":true,"field":"scale"},{"type":"int32","optional":false,"field":"position"},{"type":"boolean","optional":true,"field":"optional"},{"type":"boolean","optional":true,"field":"autoIncremented"},{"type":"boolean","optional":true,"field":"generated"},{"type":"string","optional":true,"field":"comment"}],"optional":false,"name":"io.debezium.connector.schema.Column"},"optional":false,"field":"columns"},{"type":"string","optional":true,"field":"comment"}],"optional":false,"name":"io.debezium.connector.schema.Table","field":"table"}],"optional":false,"name":"io.debezium.connector.schema.Change"},"optional":false,"field":"tableChanges"}],"optional":false,"name":"io.debezium.connector.mysql.SchemaChangeValue"},"payload":{"source":{"version":"1.8.1.Final","connector":"mysql","name":"source-mysql","ts_ms":1680764865380,"snapshot":"true","db":"source","sequence":null,"table":"student","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":871,"row":0,"thread":null,"query":null},"databaseName":"source","schemaName":null,"ddl":"CREATE TABLE student (\n id int NOT NULL,\n name varchar(26) DEFAULT NULL,\n PRIMARY KEY (id)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci","tableChanges":[{"type":"CREATE","id":"\"source\".\"student\"","table":{"defaultCharsetName":"utf8mb4","primaryKeyColumnNames":["id"],"columns":[{"name":"id","jdbcType":4,"nativeType":null,"typeName":"INT","typeExpression":"INT","charsetName":null,"length":null,"scale":null,"position":1,"optional":false,"autoIncremented":false,"generated":false,"comment":null},{"name":"name","jdbcType":12,"nativeType":null,"typeName":"VARCHAR","typeExpression":"VARCHAR","charsetName":"utf8mb4","length":26,"scale":null,"position":2,"optional":true,"autoIncremented":false,"generated":false,"comment":null}],"comment":null}}]}}

{ "name": "mysql-jdbc-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1",

    "connection.user":"root",
    "connection.password":"debeziumtar",
    "connection.url": "jdbc:mysql://172.19.0.4:3306/target",

    "topics": "source-mysql",    

    "auto.create": "true",

    "pk.mode": "none",

    "insert.mode": "insert",
    "delete.enabled": "false",        

    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable":"true",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable":"true"

}

}

2023-04-07 13:19:33 2023-04-07 07:49:33,257 INFO || Attempting to open connection #1 to MySql [io.confluent.connect.jdbc.util.CachedConnectionProvider] 2023-04-07 13:19:33 2023-04-07 07:49:33,951 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter] 2023-04-07 13:19:34 2023-04-07 07:49:34,027 INFO || Checking MySql dialect for existence of TABLE "source-mysql" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2023-04-07 13:19:34 2023-04-07 07:49:34,060 INFO || Using MySql dialect TABLE "source-mysql" absent [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2023-04-07 13:19:34 2023-04-07 07:49:34,063 ERROR || WorkerSinkTask{id=mysql-jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: io.debezium.connector.mysql.Source (STRUCT) type doesn't have a mapping to the SQL database column type [org.apache.kafka.connect.runtime.WorkerSinkTask] 2023-04-07 13:19:34 org.apache.kafka.connect.errors.ConnectException: io.debezium.connector.mysql.Source (STRUCT) type doesn't have a mapping to the SQL database column type 2023-04-07 13:19:34 at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getSqlType(GenericDatabaseDialect.java:1945) 2023-04-07 13:19:34 at io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect.getSqlType(MySqlDatabaseDialect.java:128) 2023-04-07 13:19:34 at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnSpec(GenericDatabaseDialect.java:1861) 2023-04-07 13:19:34 at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.lambda$writeColumnsSpec$39(GenericDatabaseDialect.java:1850) 2023-04-07 13:19:34 at io.confluent.connect.jdbc.util.ExpressionBuilder.append(ExpressionBuilder.java:560) 2023-04-07 13:19:34 at io.confluent.connect.jdbc.util.ExpressionBuilder$BasicListBuilder.of(ExpressionBuilder.java:599) 2023-04-07 13:19:34 at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnsSpec(GenericDatabaseDialect.java:1852) 2023-04-07 13:19:34 at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.buildCreateTableStatement(GenericDatabaseDialect.java:1769) 2023-04-07 13:19:34 at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:121) 2023-04-07 13:19:34 at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:67) 2023-04-07 13:19:34 at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:122) 2023-04-07 13:19:34 at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74) 2023-04-07 13:19:34 at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:85) 2023-04-07 13:19:34 at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) 2023-04-07 13:19:34 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) 2023-04-07 13:19:34 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) 2023-04-07 13:19:34 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) 2023-04-07 13:19:34 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186) 2023-04-07 13:19:34 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241) 2023-04-07 13:19:34 at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 2023-04-07 13:19:34 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 2023-04-07 13:19:34 at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 2023-04-07 13:19:34 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 2023-04-07 13:19:34 at java.base/java.lang.Thread.run(Thread.java:829)

2023-04-07 13:19:34 2023-04-07 07:49:34,067 INFO || Stopping task [io.confluent.connect.jdbc.sink.JdbcSinkTask] 2023-04-07 13:19:34 2023-04-07 07:49:34,068 INFO || Closing connection #1 to MySql [io.confluent.connect.jdbc.util.CachedConnectionProvider] 2023-04-07 13:19:34 2023-04-07 07:49:34,072 INFO || [Consumer clientId=connector-consumer-mysql-jdbc-sink-0, groupId=connect-mysql-jdbc-sink] Revoke previously assigned partitions source-mysql-0 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 2023-04-07 13:19:34 2023-04-07 07:49:34,073 INFO || [Consumer clientId=connector-consumer-mysql-jdbc-sink-0, groupId=connect-mysql-jdbc-sink] Member connector-consumer-mysql-jdbc-sink-0-b9c5d80a-2c04-4c34-8580-e18c8d649d36 sending LeaveGroup request to coordinator kafka:9092 (id: 2147482646 rack: null) due to the consumer is being closed [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 2023-04-07 13:19:34 2023-04-07 07:49:34,074 INFO || [Consumer clientId=connector-consumer-mysql-jdbc-sink-0, groupId=connect-mysql-jdbc-sink] Resetting generation due to: consumer pro-actively leaving the group [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 2023-04-07 13:19:34 2023-04-07 07:49:34,075 INFO || [Consumer clientId=connector-consumer-mysql-jdbc-sink-0, groupId=connect-mysql-jdbc-sink] Request joining group due to: consumer pro-actively leaving the group [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 2023-04-07 13:19:34 2023-04-07 07:49:34,077 INFO || Metrics scheduler closed [org.apache.kafka.common.metrics.Metrics] 2023-04-07 13:19:34 2023-04-07 07:49:34,077 INFO || Closing reporter org.apache.kafka.common.metrics.JmxReporter [org.apache.kafka.common.metrics.Metrics] 2023-04-07 13:19:34 2023-04-07 07:49:34,077 INFO || Metrics reporters closed [org.apache.kafka.common.metrics.Metrics] 2023-04-07 13:19:34 2023-04-07 07:49:34,080 INFO || App info kafka.consumer for connector-consumer-mysql-jdbc-sink-0 unregistered [org.apache.kafka.common.utils.AppInfoParser]

sp3c73r2038 commented 5 months ago

Hi, There. Some random google search got me here, I'm also working on it and found out for debezium-source-mysql connector. You'll have to add an SMT in the sink config like here.

{
  "name": "your-connector-name",
  "config": {
    // other options
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false"
  }
}

This is also explained in debezium-jdbc-sink connector's doc and here. although it doesn't have good support for DDL changes.

Hope it helps you.