confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
1.01k stars 953 forks source link

Sink fails to insert CLOB in DB2 #1346

Closed 4integration closed 1 year ago

4integration commented 1 year ago

I have a JDBC Sink Connector which fails to insert a CLOB to DB2 with error message (full log below)

"message":"Write of 10 records failed, remainingRetries=0",
"ecs.version": "1.2.0",
"process.thread.name":"task-thread-test5-0",
"log.logger":"io.confluent.connect.jdbc.sink.JdbcSinkTask",
"connector.context":"[test5|task-0] ",
"error.type":"com.ibm.db2.jcc.am.SqlSyntaxErrorException",
"error.message":"DB2 SQL Error: SQLCODE=-204, SQLSTATE=42704, SQLERRMC=kafka.TEST_TABLE, DRIVER=4.32.28"

Connector config

{
  "name": "test",
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "connection.url": "jdbc:db2://kafka_connect_db2:50000/kafka:currentSchema=kafka;",
  "connection.user": "db2inst1",
  "connection.password": "yourStrongPassword",
  "db.timezone": "CET",
  "dialect.name": "Db2DatabaseDialect",

  "insert.mode": "insert",
  "pk.mode": "none",
  "topics.regex": "common.vehicle.cmd",
  "table.name.format": "TEST_TABLE",

  "key.converter": "org.apache.kafka.connect.storage.StringConverter",

  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.auto.register.schemas": "false",
  "value.converter.basic.auth.credentials.source": "USER_INFO",
  "value.converter.basic.auth.user.info": "theuser:somepassword",
  "value.converter.schema.registry.url": "https://kafka-schema-registry.test.company.net",
  "value.converter.use.latest.version": "true",

  "transforms": "tojson",
  "transforms.tojson.type": "com.github.cedelsb.kafka.connect.smt.Record2JsonStringConverter$Value",
  "transforms.tojson.json.string.field.name" : "jsonstring",

  "connection.attempts": "500",
  "connection.backoff.ms": "21600"
}

The DB schema looks like:

create table test_table (
   jsonid integer not null GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1)
  ,jsonstring CLOB
  ,PRIMARY KEY (jsonid)
);

Message payload like:

{
    "jsonstring": "{\"messageId\": \"fd5d33b6-10c7-43f8-b179-fbc592727256\", \"createdAt\": {\"$date\": \"2023-06-09T12:33:31.722Z\"}, \"vehicleId\": {\"registrationNumber\": \"TEST\", \"vin\": null}, \"commandType\": \"RETURNED_COMMAND\", \"command\": {\"ReturnedCommand\": {\"leaseId\": \"TEST-001\", \"requestReceived\": {\"$date\": \"2023-06-09T12:33:29.833Z\"}, \"returnDate\": {\"$date\": \"2023-06-05T00:00:00Z\"}, \"odometerKilometers\": 2500, \"carDealer\": {\"carDealerName\": \"CarCompany\", \"recipientName\": \"Mr X\", \"recipientPhoneNumber\": \"073-389 00 99\", \"recipientEmail\": \"mr-x@mail.com\", \"address\": \"Street 11\", \"postcode\": \"43199\", \"postalAddress\": \"City\"}, \"includedItems\": [\"WINTER_WHEELS\", \"SERVICE_BOOK\"], \"damages\": [], \"additionalInfo\": \"\"}, \"InspectedCommand\": null, \"SoldCommand\": null, \"PaidCommand\": null, \"DocumentsCommand\": null}}"
}

It works to run insert directly:


INSERT INTO TEST_TABLE (jsonstring) values('"{\"messageId\": \"fd5d33b6-10c7-43f8-b179-fbc592727256\", \"createdAt\": {\"$date\": \"2023-06-09T12:33:31.722Z\"}, \"vehicleId\": {\"registrationNumber\": \"TEST\", \"vin\": null}, \"commandType\": \"RETURNED_COMMAND\", \"command\": {\"ReturnedCommand\": {\"leaseId\": \"TEST-001\", \"requestReceived\": {\"$date\": \"2023-06-09T12:33:29.833Z\"}, \"returnDate\": {\"$date\": \"2023-06-05T00:00:00Z\"}, \"odometerKilometers\": 2500, \"carDealer\": {\"carDealerName\": \"CarCompany\", \"recipientName\": \"Mr X\", \"recipientPhoneNumber\": \"073-389 00 99\", \"recipientEmail\": \"mr-x@mail.com\", \"address\": \"Street 11\", \"postcode\": \"43199\", \"postalAddress\": \"City\"}, \"includedItems\": [\"WINTER_WHEELS\", \"SERVICE_BOOK\"], \"damages\": [], \"additionalInfo\": \"\"}, \"InspectedCommand\": null, \"SoldCommand\": null, \"PaidCommand\": null, \"DocumentsCommand\": null}}"')

Detailed logs

kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.463Z","log.level": "INFO","message":"JdbcDbWriter Connected","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.sink.JdbcDbWriter","connector.context":"[test5|task-0] "}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.466Z","log.level":"DEBUG","message":"Records is empty","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.sink.BufferedRecords","connector.context":"[test5|task-0] "}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.466Z","log.level":"DEBUG","message":"Using Db2 dialect to check support for [TABLE]","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.dialect.GenericDatabaseDialect","connector.context":"[test5|task-0] "}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.493Z","log.level":"DEBUG","message":"Used Db2 dialect to find table types: [TABLE]","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.dialect.GenericDatabaseDialect","connector.context":"[test5|task-0] "}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.493Z","log.level": "INFO","message":"Checking Db2 dialect for existence of TABLE \"TEST_TABLE\"","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.dialect.GenericDatabaseDialect","connector.context":"[test5|task-0] "}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.504Z","log.level": "INFO","message":"Using Db2 dialect TABLE \"TEST_TABLE\" present","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.dialect.GenericDatabaseDialect","connector.context":"[test5|task-0] "}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.504Z","log.level":"DEBUG","message":"Querying Db2 dialect column metadata for catalog:null schema:null table:TEST_TABLE","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.dialect.GenericDatabaseDialect","connector.context":"[test5|task-0] "}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.560Z","log.level":"DEBUG","message":"Using Db2 dialect to check support for [TABLE]","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.dialect.GenericDatabaseDialect","connector.context":"[test5|task-0] "}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.561Z","log.level":"DEBUG","message":"Used Db2 dialect to find table types: [TABLE]","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.dialect.GenericDatabaseDialect","connector.context":"[test5|task-0] "}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.561Z","log.level": "INFO","message":"Checking Db2 dialect for type of TABLE \"TEST_TABLE\"","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.dialect.GenericDatabaseDialect","connector.context":"[test5|task-0] "}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.562Z","log.level": "INFO","message":"Setting metadata for table \"TEST_TABLE\" to Table{name='\"TEST_TABLE\"', type=TABLE columns=[Column{'JSONSTRING', isPrimaryKey=false, allowsNull=true, sqlType=CLOB}, Column{'JSONDATA', isPrimaryKey=false, allowsNull=true, sqlType=CLOB}, Column{'THEID', isPrimaryKey=true, allowsNull=false, sqlType=INTEGER}]}","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.util.TableDefinitions","connector.context":"[test5|task-0] "}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.562Z","log.level":"DEBUG","message":"Found missing field: SinkRecordField{schema=Schema{STRING}, name='jsonstring', isPrimaryKey=false}","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.sink.DbStructure","connector.context":"[test5|task-0] "}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.562Z","log.level":"DEBUG","message":"INSERT sql: INSERT INTO \"TEST_TABLE\"(\"jsonstring\") VALUES(?) deleteSql: null meta: FieldsMetadata{keyFieldNames=[], nonKeyFieldNames=[jsonstring], allFields={jsonstring=SinkRecordField{schema=Schema{STRING}, name='jsonstring', isPrimaryKey=false}}}","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.sink.BufferedRecords","connector.context":"[test5|task-0] "}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.562Z","log.level":"DEBUG","message":"Closing BufferedRecords with updatePreparedStatement: null deletePreparedStatement: null","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.sink.BufferedRecords","connector.context":"[test5|task-0] "}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.562Z","log.level":"DEBUG","message":"Flushing records in JDBC Writer for table ID: \"TEST_TABLE\"","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.sink.JdbcDbWriter","connector.context":"[test5|task-0] "}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.562Z","log.level":"DEBUG","message":"Flushing 10 buffered records","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.sink.BufferedRecords","connector.context":"[test5|task-0] "}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.566Z","log.level": "WARN","message":"Write of 10 records failed, remainingRetries=0","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.sink.JdbcSinkTask","connector.context":"[test5|task-0] ","error.type":"com.ibm.db2.jcc.am.SqlSyntaxErrorException","error.message":"DB2 SQL Error: SQLCODE=-204, SQLSTATE=42704, SQLERRMC=kafka.TEST_TABLE, DRIVER=4.32.28","error.stack_trace":"com.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-204, SQLSTATE=42704, SQLERRMC=kafka.TEST_TABLE, DRIVER=4.32.28\n\tat com.ibm.db2.jcc.am.b7.a(b7.java:810)\n\tat com.ibm.db2.jcc.am.b7.a(b7.java:66)\n\tat com.ibm.db2.jcc.am.b7.a(b7.java:140)\n\tat com.ibm.db2.jcc.am.lc.c(lc.java:2868)\n\tat com.ibm.db2.jcc.am.lc.d(lc.java:2852)\n\tat com.ibm.db2.jcc.am.lc.a(lc.java:2278)\n\tat com.ibm.db2.jcc.am.ld.a(ld.java:8405)\n\tat com.ibm.db2.jcc.t4.ab.i(ab.java:204)\n\tat com.ibm.db2.jcc.t4.ab.b(ab.java:94)\n\tat com.ibm.db2.jcc.t4.p.a(p.java:32)\n\tat com.ibm.db2.jcc.t4.av.i(av.java:150)\n\tat com.ibm.db2.jcc.am.lc.al(lc.java:2247)\n\tat com.ibm.db2.jcc.am.ld.bq(ld.java:3886)\n\tat com.ibm.db2.jcc.am.ld.u(ld.java:4049)\n\tat com.ibm.db2.jcc.am.ld.n(ld.java:3084)\n\tat com.ibm.db2.jcc.am.ld.addBatch(ld.java:3020)\n\tat io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:115)\n\tat io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:183)\n\tat io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:88)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\n"}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.567Z","log.level":"ERROR","message":"Failing task after exhausting retries; encountered 2 exceptions on last write attempt. For complete details on each exception, please enable DEBUG logging.","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.sink.JdbcSinkTask","connector.context":"[test5|task-0] "}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.567Z","log.level":"DEBUG","message":"Exception 1:","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.sink.JdbcSinkTask","connector.context":"[test5|task-0] ","error.type":"com.ibm.db2.jcc.am.SqlSyntaxErrorException","error.message":"DB2 SQL Error: SQLCODE=-204, SQLSTATE=42704, SQLERRMC=kafka.TEST_TABLE, DRIVER=4.32.28","error.stack_trace":"com.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-204, SQLSTATE=42704, SQLERRMC=kafka.TEST_TABLE, DRIVER=4.32.28\n\tat com.ibm.db2.jcc.am.b7.a(b7.java:810)\n\tat com.ibm.db2.jcc.am.b7.a(b7.java:66)\n\tat com.ibm.db2.jcc.am.b7.a(b7.java:140)\n\tat com.ibm.db2.jcc.am.lc.c(lc.java:2868)\n\tat com.ibm.db2.jcc.am.lc.d(lc.java:2852)\n\tat com.ibm.db2.jcc.am.lc.a(lc.java:2278)\n\tat com.ibm.db2.jcc.am.ld.a(ld.java:8405)\n\tat com.ibm.db2.jcc.t4.ab.i(ab.java:204)\n\tat com.ibm.db2.jcc.t4.ab.b(ab.java:94)\n\tat com.ibm.db2.jcc.t4.p.a(p.java:32)\n\tat com.ibm.db2.jcc.t4.av.i(av.java:150)\n\tat com.ibm.db2.jcc.am.lc.al(lc.java:2247)\n\tat com.ibm.db2.jcc.am.ld.bq(ld.java:3886)\n\tat com.ibm.db2.jcc.am.ld.u(ld.java:4049)\n\tat com.ibm.db2.jcc.am.ld.n(ld.java:3084)\n\tat com.ibm.db2.jcc.am.ld.addBatch(ld.java:3020)\n\tat io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:115)\n\tat io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:183)\n\tat io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:88)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\n"}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.567Z","log.level":"DEBUG","message":"Exception 2:","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"io.confluent.connect.jdbc.sink.JdbcSinkTask","connector.context":"[test5|task-0] ","error.type":"com.ibm.db2.jcc.am.SqlException","error.message":"DB2 SQL Error: SQLCODE=-727, SQLSTATE=56098, SQLERRMC=2;-204;42704;kafka.TEST_TABLE, DRIVER=4.32.28","error.stack_trace":"com.ibm.db2.jcc.am.SqlException: DB2 SQL Error: SQLCODE=-727, SQLSTATE=56098, SQLERRMC=2;-204;42704;kafka.TEST_TABLE, DRIVER=4.32.28\n\tat com.ibm.db2.jcc.am.b7.a(b7.java:815)\n\tat com.ibm.db2.jcc.am.b7.a(b7.java:66)\n\tat com.ibm.db2.jcc.am.b7.a(b7.java:140)\n\tat com.ibm.db2.jcc.am.lc.c(lc.java:2868)\n\tat com.ibm.db2.jcc.am.lc.d(lc.java:2852)\n\tat com.ibm.db2.jcc.am.ld.a(ld.java:3787)\n\tat com.ibm.db2.jcc.t4.ab.a(ab.java:253)\n\tat com.ibm.db2.jcc.t4.ab.b(ab.java:140)\n\tat com.ibm.db2.jcc.t4.p.b(p.java:69)\n\tat com.ibm.db2.jcc.t4.aw.c(aw.java:244)\n\tat com.ibm.db2.jcc.am.ld.bn(ld.java:3778)\n\tat com.ibm.db2.jcc.am.ld.bq(ld.java:3887)\n\tat com.ibm.db2.jcc.am.ld.u(ld.java:4049)\n\tat com.ibm.db2.jcc.am.ld.n(ld.java:3084)\n\tat com.ibm.db2.jcc.am.ld.addBatch(ld.java:3020)\n\tat io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:115)\n\tat io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:183)\n\tat io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:88)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\n"}
kafka-connect                      | {"@timestamp":"2023-06-16T09:22:48.567Z","log.level":"ERROR","message":"WorkerSinkTask{id=test5-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: java.sql.SQLException: Exception chain:\ncom.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-204, SQLSTATE=42704, SQLERRMC=kafka.TEST_TABLE, DRIVER=4.32.28\ncom.ibm.db2.jcc.am.SqlException: DB2 SQL Error: SQLCODE=-727, SQLSTATE=56098, SQLERRMC=2;-204;42704;kafka.TEST_TABLE, DRIVER=4.32.28\n","ecs.version": "1.2.0","process.thread.name":"task-thread-test5-0","log.logger":"org.apache.kafka.connect.runtime.WorkerSinkTask","connector.context":"[test5|task-0] ","error.type":"org.apache.kafka.connect.errors.ConnectException","error.message":"java.sql.SQLException: Exception chain:\ncom.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-204, SQLSTATE=42704, SQLERRMC=kafka.TEST_TABLE, DRIVER=4.32.28\ncom.ibm.db2.jcc.am.SqlException: DB2 SQL Error: SQLCODE=-727, SQLSTATE=56098, SQLERRMC=2;-204;42704;kafka.TEST_TABLE, DRIVER=4.32.28\n","error.stack_trace":"org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:\ncom.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-204, SQLSTATE=42704, SQLERRMC=kafka.TEST_TABLE, DRIVER=4.32.28\ncom.ibm.db2.jcc.am.SqlException: DB2 SQL Error: SQLCODE=-727, SQLSTATE=56098, SQLERRMC=2;-204;42704;kafka.TEST_TABLE, DRIVER=4.32.28\n\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:128)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: java.sql.SQLException: Exception chain:\ncom.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-204, SQLSTATE=42704, SQLERRMC=kafka.TEST_TABLE, DRIVER=4.32.28\ncom.ibm.db2.jcc.am.SqlException: DB2 SQL Error: SQLCODE=-727, SQLSTATE=56098, SQLERRMC=2;-204;42704;kafka.TEST_TABLE, DRIVER=4.32.28\n\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:159)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:108)\n\t... 12 more\n"}

Do you have any idea why it fails and what to do about it?

4integration commented 1 year ago

Nevermind, some mismatch caused the issues