Satyajitv / DebeziumTimestampConvertor

Kafka SMT implementation of Debezium timestamp convertor
1 stars 1 forks source link

Caused by: org.apache.avro.SchemaParseException: Can't redefine: dbs1.demo.Events.Value #2

Open saurav07sc opened 1 year ago

saurav07sc commented 1 year ago

Debezium MySQL Source Connector Configuration { "name": "cdc-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schemas.enable": "true", "value.converter.schema.registry.url": "", "value.converter.basic.auth.credentials.source": "USER_INFO", "value.converter.basic.auth.user.info": "", "database.hostname": "host.docker.internal", "database.port": "3306", "database.user": "demo", "database.password": "**", "database.server.id": "223344", "database.server.name": "dbs1", "table.include.list": "demo.Events", "include.query": "true", "database.history.kafka.bootstrap.servers": "", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "PLAIN", "database.history.consumer.sasl.jaas.config": "", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "PLAIN", "database.history.producer.sasl.jaas.config": "***", "database.history.kafka.topic": "dbhistory.demo2", "database.allowPublicKeyRetrieval": true, "topic.creation.default.replication.factor": "3", "topic.creation.default.partitions": "3", "decimal.handling.mode": "double", "include.schema.changes": "true", "transforms": "addTopicPrefix,TimestampConv", "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.addTopicPrefix.regex":"(.*)", "transforms.addTopicPrefix.replacement":"mysql-debezium-$1", "snapshot.mode": "when_needed", "transforms.TimestampConv.type":"org.telmate.SMT.DebeziumTimestampConverter$Value", "transforms.TimestampConv.target.type":"string", "transforms.TimestampConv.field.type":"io.debezium.time.Timestamp->string,io.debezium.time.Date->string,io.debezium.time.MicroTimestamp->string", "transforms.TimestampConv.struct.field":"after", "transforms.TimestampConv.timestamp.format":"yyyy-MM-dd hh:mm:ss" } }

When I run the connector with the above configuration, I am getting following error:

[2022-12-15 07:59:06,125] ERROR [cdc-connector|task-0] WorkerSourceTask{id=cdc-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:195) connect-107 | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler connect-107 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) connect-107 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) connect-107 | at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:329) connect-107 | at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355) connect-107 | at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257) connect-107 | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) connect-107 | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) connect-107 | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) connect-107 | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) connect-107 | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) connect-107 | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) connect-107 | at java.base/java.lang.Thread.run(Thread.java:829) connect-107 | Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic mysql-debezium-dbs1.demo.Events : connect-107 | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:91) connect-107 | at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63) connect-107 | at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:329) connect-107 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) connect-107 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) connect-107 | ... 11 more connect-107 | Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message connect-107 | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:119) connect-107 | at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:143) connect-107 | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:84) connect-107 | ... 15 more connect-107 | Caused by: org.apache.avro.SchemaParseException: Can't redefine: dbs1.demo.Events.Value connect-107 | at org.apache.avro.Schema$Names.put(Schema.java:1511) connect-107 | at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:782) connect-107 | at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:943) connect-107 | at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971) connect-107 | at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955) connect-107 | at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203) connect-107 | at org.apache.avro.Schemas.toString(Schemas.java:46) connect-107 | at org.apache.avro.Schemas.toString(Schemas.java:30) connect-107 | at io.confluent.kafka.schemaregistry.avro.AvroSchema.canonicalString(AvroSchema.java:140) connect-107 | at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:206) connect-107 | at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:268) connect-107 | at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:244) connect-107 | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:75) connect-107 | ... 17 more

If the custom DebeziumTimestampConverter is not used then the connector is working fine but the connector throws error when custom converter is used. Could you please help me with the issue?

AyoubOukh commented 3 months ago

Did you find a workaround?