confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
19 stars 955 forks source link

therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='fullab',partition=0,offset=0,timestamp=1681657492997) with a HashMap key and null key schema #1330

Open zhijiansd opened 1 year ago

zhijiansd commented 1 year ago

The version is as follows:kafka_2.12-3.4.0,debezium-connector-mysql-2.1.4、confluentinc-kafka-connect-jdbc-10.7.0. Whether extracting from mariadb to mariadb or from mairiadb to oracle, the following error occurred,I hope to receive answers from the community. Thank you.:

ERROR [oracle-jdbc-sink|task-0] WorkerSinkTask{id=oracle-jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Sink connector 'oracle-jdbc-sink' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='fullab',partition=0,offset=0,timestamp=1681657492997) with a HashMap key and null key schema. (org.apache.kafka.connect.runtime.WorkerSinkTask:612)
org.apache.kafka.connect.errors.ConnectException: Sink connector 'oracle-jdbc-sink' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='fullab',partition=0,offset=0,timestamp=1681657492997) with a HashMap key and null key schema.
    at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresKey$3(RecordValidator.java:116)
    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:81)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:88)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

Source-Connect:

# cat config/connect-mysql-source.properties 
name=mysql-source
tasks.max=1
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=x.x.x.x
database.port=3306
database.user=root
database.password=xxxxxxx
database.server.id=22
topic.prefix=fullab
database.include.list=test
schema.history.internal.kafka.bootstrap.servers=localhost:9092
schema.history.internal.kafka.topic=schema-changes.ab
include.schema.changes=true
debezium.inconsistent.schema.handling.mode=warn
snapshot.mode=initial
max.retries=30
retry.backoff.ms=10000

Sink-Connect:

# cat config/connect-oracle-sink.properties 
name=oracle-jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=fullab
connection.url=jdbc:oracle:thin:@//x.x.x.x:1521/orcl
connection.user=test
connection.password=xxxxxx
connection.password.secure.key=mycredentialstorekey
delete.enabled=true
pk.mode=record_key
auto.evolve=true
insert.mode=upsert
max.retries=30
retry.backoff.ms=10000
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false