SAP / kafka-connect-sap

Kafka Connect SAP is a set of connectors, using the Apache Kafka Connect framework for reliably connecting Kafka with SAP systems
Apache License 2.0
122 stars 56 forks source link

Sink Connector with key.converter=org.apache.kafka.connect.storage.StringConverter expecting schema at HANASinkRecordsCollector #134

Closed AndreasFischbach3003 closed 2 years ago

AndreasFischbach3003 commented 2 years ago

Hi,

think i have a kind of similar problem as on Issue #125 where the sink connector is not working properly.

I have the configuration with the keys converted with org.apache.kafka.connect.storage.StringConverter and the values converted with io.confluent.connect.avro.AvroConverter.

When running the connect-standalone commandline i get the exception [2022-08-30 16:02:05,830] DEBUG [sap_kafka_connector|task-0] [Consumer clientId=connector-consumer-sap_kafka_connector-0, groupId=connect-sap_kafka_connector] Received successful Heartbeat response (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1180) [2022-08-30 16:02:06,213] INFO [sap_kafka_connector|task-0] PHASE - 1 ended for task, with assigned partitions [erp.nav.local.variantSalesPriceChanged-0, erp.nav.local.variantSalesPriceChanged-4, erp.nav.local.variantSalesPriceChanged-3, erp.nav.local.variantSalesPriceChanged-2, erp.nav.local.variantSalesPriceChanged-1, erp.nav.local.variantSalesPriceChanged-5] (com.sap.kafka.connect.sink.hana.HANASinkTask:56) [2022-08-30 16:02:06,213] ERROR [sap_kafka_connector|task-0] WorkerSinkTask{id=sap_kafka_connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot list fields on non-struct type (org.apache.kafka.connect.runtime.WorkerSinkTask:609) org.apache.kafka.connect.errors.DataException: Cannot list fields on non-struct type at org.apache.kafka.connect.data.ConnectSchema.fields(ConnectSchema.java:179) at com.sap.kafka.connect.sink.hana.HANASinkRecordsCollector.add(HANASinkRecordsCollector.scala:124) at com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2(HANAWriter.scala:56) at com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2$adapted(HANAWriter.scala:44) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) at scala.collection.AbstractIterable.foreach(Iterable.scala:919) at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889) at com.sap.kafka.connect.sink.hana.HANAWriter.write(HANAWriter.scala:44) at com.sap.kafka.connect.sink.GenericSinkTask.put(GenericSinkTask.scala:36) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)

It looks like even though the key is with the StringConverter, the keySchema is not null, but it has no fields attached to it.

i have tried and changed line 123 from

  if (recordSchema.keySchema != null) {

to

  if (recordSchema.keySchema != null && recordSchema.keySchema.name() != null) {

and line 63 from if (recordSchema.keySchema != null) {

to

if (recordSchema.keySchema != null && recordSchema.keySchema.name() != null) {

that worked and i could sink the values into the hana db.

can you perhaps check if i could have changed my configuration to get this working? Or is this something that needs to be enhanced for the

key.converter=org.apache.kafka.connect.storage.StringConverter

Kind Regards and thanks for the help

elakito commented 2 years ago

I added a similar but more explicit condition to inspect the schema. The fields are inspected only when the associated schema is structured schema. https://github.com/SAP/kafka-connect-sap/pull/135

Am I correct that you were getting this error only from the keySchema part? I think the valueSchema should be a structured schema that has one field when the original data is retrieved from a collection/json table.

AndreasFischbach3003 commented 2 years ago

Hi, i have tested with my configuration and it worked now. Yes the exception was from the keySchema, as this one was converted with org.apache.kafka.connect.storage.StringConverter. The key schema was actually a avro schema.

Thanks for your help