SAP / kafka-connect-sap

Kafka Connect SAP is a set of connectors, using the Apache Kafka Connect framework for reliably connecting Kafka with SAP systems
Apache License 2.0
125 stars 58 forks source link

Sink Connector with Hana Json Store not working #125

Open mschuch opened 2 years ago

mschuch commented 2 years ago

Hi wanted to test if the sink connector is working with the Hana JSON Store, but I am unlucky with that. Here is my config:

{
    "name": "test_topic_json_sink",
    "config": {
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "connector.class":"com.sap.kafka.connect.sink.hana.HANASinkConnector",
        "value.converter.schemas.enable":"false",
        "topics":"test_json",
        "connection.url":"jdbc:sap://[url]l:33041/?currentschema=SCHEMA&encrypt=true&validateCertificate=false",
        "connection.user":"xxxxx",
        "connection.password":"xxxxx",
        "test_json.table.name":"\"SCHEMA\".\"TEST_COLLECTION\"",
        "test_json.type":"collection",
        "auto.create": "true",
        "principal.service.name": "xxx",
        "principal.service.password": "xxxxt"
    }
}

I am getting the following HANAConfigMissingException : A table name or query must be specified for HANA Kafka Connectors to work

I got a little further than that, but I am now stuck with a DataException. "Cannot list fields on non-struct type". What am i doing wrong here? Is there a working example?

elakito commented 2 years ago

@mschuch unfortunately, there is no example in the tests. Could you post the exception? It looks like the sink connector is compelling about the incoming schema definition. Can you try to ingest the data into a normal table type to verify that the incoming schema is okay?

mschuch commented 2 years ago

Hi @elakito, i have tested it with the normal table type and an avro schema, there everything is fine. I also did that without a schema, and plain JSON (see above connect config)

I always get the same exception with or without a schema:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:638)
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)
\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:254)
\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)
\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
\tat java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Cannot list fields on non-struct type
\tat org.apache.kafka.connect.data.ConnectSchema.fields(ConnectSchema.java:179)
\tat com.sap.kafka.connect.sink.hana.HANASinkRecordsCollector.add(HANASinkRecordsCollector.scala:64)
\tat com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2(HANAWriter.scala:56)
\tat com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2$adapted(HANAWriter.scala:44)
\tat scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
\tat scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
\tat scala.collection.AbstractIterable.foreach(Iterable.scala:919)
\tat scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889)
\tat com.sap.kafka.connect.sink.hana.HANAWriter.write(HANAWriter.scala:44)
\tat com.sap.kafka.connect.sink.GenericSinkTask.put(GenericSinkTask.scala:36)
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:604)
\t... 10 more
elakito commented 2 years ago

@mschuch How are you creating the input message? If your avro scenario is working when the messages are sent to a normal sink table, these messages are valid and you can also send them to a collection sink table by just changing the table type and name in the sink connector configuration.

The error from your stack trace indicates the error happens when the structure of the message is analysed. It looks like something is wrong with the schema of the message. After the structure is determined from the schema, the message is prepared for the actual storing. When the target table is of collection type, they are serialised into a json document and stored into the collection table. Otherwise the records are stored as normal table records.

AndreasFischbach3003 commented 2 years ago

Hi, @elakito. I use the same configuration with the keys converted with org.apache.kafka.connect.storage.StringConverter and the values converted with io.confluent.connect.avro.AvroConverter. i get the exception [2022-08-30 16:02:05,830] DEBUG [sap_kafka_connector|task-0] [Consumer clientId=connector-consumer-sap_kafka_connector-0, groupId=connect-sap_kafka_connector] Received successful Heartbeat response (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1180) [2022-08-30 16:02:06,213] INFO [sap_kafka_connector|task-0] PHASE - 1 ended for task, with assigned partitions [erp.nav.local.variantSalesPriceChanged-0, erp.nav.local.variantSalesPriceChanged-4, erp.nav.local.variantSalesPriceChanged-3, erp.nav.local.variantSalesPriceChanged-2, erp.nav.local.variantSalesPriceChanged-1, erp.nav.local.variantSalesPriceChanged-5] (com.sap.kafka.connect.sink.hana.HANASinkTask:56) [2022-08-30 16:02:06,213] ERROR [sap_kafka_connector|task-0] WorkerSinkTask{id=sap_kafka_connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot list fields on non-struct type (org.apache.kafka.connect.runtime.WorkerSinkTask:609) org.apache.kafka.connect.errors.DataException: Cannot list fields on non-struct type at org.apache.kafka.connect.data.ConnectSchema.fields(ConnectSchema.java:179) at com.sap.kafka.connect.sink.hana.HANASinkRecordsCollector.add(HANASinkRecordsCollector.scala:124) at com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2(HANAWriter.scala:56) at com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2$adapted(HANAWriter.scala:44) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) at scala.collection.AbstractIterable.foreach(Iterable.scala:919) at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889) at com.sap.kafka.connect.sink.hana.HANAWriter.write(HANAWriter.scala:44) at com.sap.kafka.connect.sink.GenericSinkTask.put(GenericSinkTask.scala:36) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)

It looks like even though the key is with the StringConverter, the keySchema is not null, but it has no fields attached to it.

i have tried and changed line 123 from

      if (recordSchema.keySchema != null) {

to

      if (recordSchema.keySchema != null && recordSchema.keySchema.name() != null) {

and line 63 from if (recordSchema.keySchema != null) {

to

    if (recordSchema.keySchema != null && recordSchema.keySchema.name() != null) {

that worked and i could sink the values into the hana db.

can you perhaps check if i could have changed my configuration to get this working? Or is this something that needs to be enhanced for the

key.converter=org.apache.kafka.connect.storage.StringConverter

Kind Regards and thanks for the help

elakito commented 2 years ago

Thanks for your investigation. I added a more explicit condition to inspect the structured schema. https://github.com/SAP/kafka-connect-sap/pull/135