Open sudzzz opened 1 year ago
Can you show the output of kafka-avro-console-consumer --topic dbserver2.uts_production_kafkaconnect.booking_journal_production
?
If you see nested objects, then that's the error; JDBC Sink requires non-nested records.
My MongoDB source configuration is:-
{ "name": "MongoDbSourceConnector", "config": { "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "tasks.max": "1", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "mongodb.hosts": "debezium/localhost:27017", "mongodb.user": "sudhir", "mongodb.password": "password", "mongodb.name": "dbserver2", "value.converter.schema.registry.url": "http://localhost:8081", "key.converter.schema.registry.url": "http://localhost:8081" } }
After uploading this connector a topic is created with name "server_name.database_name.collection_name". In my case say "dbserver2.uts_production_kafkaconnect.booking_journal_production".
Example of record in uts_production_kafkaconnect.booking_journal_production is { "_id": { "id": 6 }, "amount": 6000, "booking_status": "BOOKED", "destination_station": "New Delhi", "journey_date": { "$date": "2023-03-20T20:15:48.175Z" }, "last_modified": { "$date": "2023-03-20T20:15:48.175Z" }, "number_of_passengers": 4, "source_station": "Howrah", "ticket_number": "fcf1ef67-1900-41b1-95d4-59c61e80452a", "train_number": 12301 }
But in topic dbserver2.uts_production_kafkaconnect.booking_journal_production, it get's published in "after" property in string format.
I want this data to get transferred in mysql database whose structure is
+----------------------+--------------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +----------------------+--------------+------+-----+---------+----------------+ | id | int | NO | PRI | NULL | auto_increment | | amount | double | YES | | NULL | | | booking_status | varchar(255) | YES | | NULL | | | destination_station | varchar(255) | YES | | NULL | | | journey_date | datetime(6) | YES | | NULL | | | last_modified | datetime(6) | YES | | NULL | | | number_of_passengers | int | YES | | NULL | | | source_station | varchar(255) | YES | | NULL | | | ticket_number | varchar(255) | NO | UNI | NULL | | | train_number | int | YES | | NULL | | +----------------------+--------------+------+-----+---------+----------------+
The mysql source connector I have written is
{ "name": "JdbcSinkConnector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "transforms": "unwrap", "topics": "dbserver2.uts_production_kafkaconnect.booking_journal_production", "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope", "connection.url": "jdbc:mysql://localhost:3306/uts_thinclient_kafkaconnect", "connection.user": "root", "connection.password": "password", "dialect.name": "MySqlDatabaseDialect", "insert.mode": "upsert", "table.name.format": "uts_thinclient_kafkaconnect.booking_journal_thinclient", "pk.mode": "record_value", "pk.fields": "id", "db.timezone": "Asia/Kolkata", "auto.create": "true", "auto.evolve": "true", "value.converter.schema.registry.url": "http://localhost:8081", "key.converter.schema.registry.url": "http://localhost:8081" }
The error I am getting is
INFO Setting metadata for table "uts_thinclient_kafkaconnect"."booking_journal_thinclient" to Table{name='"uts_thinclient_kafkaconnect"."booking_journal_thinclient"', type=TABLE columns=[Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=INT}, Column{'booking_status', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'number_of_passengers', isPrimaryKey=false, allowsNull=true, sqlType=INT}, Column{'train_number', isPrimaryKey=false, allowsNull=true, sqlType=INT}, Column{'journey_date', isPrimaryKey=false, allowsNull=true, sqlType=DATETIME}, Column{'amount', isPrimaryKey=false, allowsNull=true, sqlType=DOUBLE}, Column{'source_station', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'ticket_number', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'last_modified', isPrimaryKey=false, allowsNull=true, sqlType=DATETIME}, Column{'destination_station', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}]} (io.confluent.connect.jdbc.util.TableDefinitions:64) [2023-04-17 17:51:20,692] ERROR WorkerSinkTask{id=JdbcSinkConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Unsupported source data type: STRUCT (org.apache.kafka.connect.runtime.WorkerSinkTask:612) org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: STRUCT at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.bindField(GenericDatabaseDialect.java:1609) at io.confluent.connect.jdbc.dialect.DatabaseDialect.bindField(DatabaseDialect.java:608) at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindField(PreparedStatementBinder.java:186) at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindKeyFields(PreparedStatementBinder.java:154) at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:102) at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:184) at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:109) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) [2023-04-17 17:51:20,693] ERROR WorkerSinkTask{id=JdbcSinkConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: STRUCT at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.bindField(GenericDatabaseDialect.java:1609) at io.confluent.connect.jdbc.dialect.DatabaseDialect.bindField(DatabaseDialect.java:608) at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindField(PreparedStatementBinder.java:186) at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindKeyFields(PreparedStatementBinder.java:154) at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:102) at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:184) at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:109) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
Please Help Thanks